Skip to content

Instantly share code, notes, and snippets.

@deepakkashyap3013
Created May 29, 2025 15:16
Show Gist options
  • Select an option

  • Save deepakkashyap3013/2f898f52ee30632ddc6a5cfdefff24be to your computer and use it in GitHub Desktop.

Select an option

Save deepakkashyap3013/2f898f52ee30632ddc6a5cfdefff24be to your computer and use it in GitHub Desktop.
GraphRag with Neo4j + Pinecone vector DB (Hybrid solution)
import os
import csv
import logging
import uuid
import json
from typing import Dict, List, Tuple, Any
from retry import retry
from dotenv import load_dotenv
from neo4j import GraphDatabase
import pandas as pd
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from pinecone import Pinecone, ServerlessSpec
import openai
from dataclasses import dataclass
# Load environment variables
load_dotenv(dotenv_path='.env')
openai.api_key = os.getenv('OPENAI_API_KEY')
# Set up logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
NODES = [
"Person", "Company", "Topic", "Founder"
]
@dataclass
class Entity:
"""Represents an extracted entity from transcript text"""
name: str
type: str # PERSON, COMPANY, TOPIC, etc.
context: str
confidence: float = 0.8
@dataclass
class Relationship:
"""Represents a relationship between two entities"""
source: str
target: str
relation_type: str
context: str
confidence: float = 0.8
class ConfigManager:
"""Manages configuration and API keys"""
def __init__(self):
self.required_keys = [
'OPENAI_API_KEY',
'PINECONE_API_KEY',
'PINECONE_HOST',
'NEO4J_URI',
'NEO4J_USERNAME',
'NEO4J_PASSWORD'
]
self.validate_environment()
def validate_environment(self):
"""Ensure all required environment variables are set"""
missing_keys = [key for key in self.required_keys if not os.getenv(key)]
if missing_keys:
raise ValueError(f"Missing required environment variables: {missing_keys}")
@property
def openai_api_key(self) -> str:
return os.getenv('OPENAI_API_KEY')
@property
def pinecone_config(self) -> Dict[str, str]:
return {
'api_key': os.getenv('PINECONE_API_KEY'),
'host': os.getenv('PINECONE_HOST')
}
@property
def neo4j_config(self) -> Dict[str, str]:
return {
'uri': os.getenv('NEO4J_URI'),
'user': os.getenv('NEO4J_USERNAME'),
'password': os.getenv('NEO4J_PASSWORD'),
'database': os.getenv('NEO4J_DATABASE')
}
@retry(tries=10, delay=10)
def set_uniquness_constraints(tx, node):
query = f"""CREATE CONSTRAINT IF NOT EXISTS FOR
(n:{node}) REQUIRE n.id IS UNIQUE;"""
_ = tx.run(query)
class EntityExtractor:
"""Extracts entities and relationships from transcript text using LLM"""
def __init__(self, config: ConfigManager):
self.llm = ChatOpenAI(
temperature=0.1,
openai_api_key=config.openai_api_key,
model_name="gpt-4"
)
# Prompt template for entity extraction
self.entity_prompt = PromptTemplate(
input_variables=["text", "founder_name"],
template="""
You are an expert at extracting structured information from business meeting transcripts.
Founder: {founder_name}
From the following transcript text, extract:
1. PEOPLE mentioned (names, roles, companies they work for)
2. COMPANIES mentioned
3. TOPICS discussed (products, strategies, challenges, opportunities)
4. RELATIONSHIPS between entities (who works with whom, what topics relate to what companies, etc.)
Text: {text}
Return your response as valid JSON with this structure:
{{
"entities": [
{{"name": "entity_name", "type": "PERSON|COMPANY|TOPIC", "context": "relevant_context", "confidence": 0.9}}
],
"relationships": [
{{"source": "entity1", "target": "entity2", "relation_type": "WORKS_WITH|DISCUSSES|LEADS|etc", "context": "context", "confidence": 0.8}}
]
}}
Focus on business-relevant information. Be precise with entity names and relationship types.
"""
)
def extract_from_text(self, text: str, founder_name: str) -> Tuple[List[Entity], List[Relationship]]:
"""Extract entities and relationships from a text chunk"""
try:
# Get LLM response
prompt = self.entity_prompt.format(text=text, founder_name=founder_name)
response = self.llm.predict(prompt)
# Parse JSON response
data = json.loads(response)
# Convert to our data structures
entities = [Entity(**entity) for entity in data.get('entities', [])]
relationships = [Relationship(**rel) for rel in data.get('relationships', [])]
return entities, relationships
except Exception as e:
logger.error(f"Error extracting entities: {e}")
return [], []
class PineconeVectorStore:
"""Manages Pinecone vector storage operations"""
def __init__(self, config: ConfigManager, index_name: str = "founder-transcripts"):
# Initialize Pinecone
self.pc = Pinecone(
api_key=config.pinecone_config['api_key']
)
self.embeddings = OpenAIEmbeddings(
openai_api_key=config.openai_api_key,
model="text-embedding-3-large",
dimensions=3072
)
if not self.pc.has_index(index_name):
self.index = self.pc.create_index(
name=index_name,
dimension=3072,
metric="cosine",
spec=ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
self.index = self.pc.Index(index_name)
def add_documents(self, documents: List[Document], namespace: str = 'ns1', batch_size: int = 32):
"""Add documents to vector store with namespace"""
doc_embeds = self.embed([doc.page_content for doc in documents])
vectors = []
for doc, embed in zip(documents, doc_embeds):
metadata = doc.metadata.copy() if doc.metadata else {}
metadata['text'] = doc.page_content
vectors.append({
"id": str(hash(doc.page_content)),
"values": embed,
"metadata": metadata
})
self.index.upsert(
vectors=vectors,
namespace=namespace,
batch_size=batch_size
)
def similarity_search(self, query: str, namespace: str = 'ns1', k: int = 5):
"""Search for similar documents within a namespace"""
query_embedding = self.embed([query])[0]
return self.index.query(
namespace=namespace,
vector=query_embedding,
top_k=k,
include_values=False,
include_metadata=True
)
def embed(self, texts: list[str]) -> list[list[float]]:
"""Embed texts using OpenAI's embedding model"""
res = openai.embeddings.create(
input=texts,
model="text-embedding-3-large"
)
return [r.embedding for r in res.data]
class TranscriptProcessor:
"""Processes transcript files and chunks them appropriately"""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
separators=[
"\n\n",
"\n",
" ",
".",
",",
"\u200b", # Zero-width space
"\uff0c", # Fullwidth comma
"\u3001", # Ideographic comma
"\uff0e", # Fullwidth full stop
"\u3002", # Ideographic full stop
"",
],
chunk_size=1000,
chunk_overlap=0,
length_function=len,
)
def load_csv_transcript(self, file_path: str) -> List[Dict]:
"""Load transcript from CSV file"""
transcripts = []
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
transcripts.append(row)
return transcripts
def process_transcript_row(self, row: Dict) -> Document:
"""Convert a transcript row to a Document object"""
# Combine speaker and text for full context
content = f"Speaker: {row.get('speaker', 'Unknown')}\nText: {row.get('text', '')}"
metadata = {
'speaker': row.get('speaker', 'Unknown'),
'timestamp': row.get('timestamp', ''),
'meeting_id': row.get('meeting_id', str(uuid.uuid4())),
'source': 'transcript'
}
return Document(page_content=content, metadata=metadata)
def chunk_documents(self, documents: List[Document]) -> List[Document]:
"""Split documents into smaller chunks for better processing"""
return self.text_splitter.split_documents(documents)
class Neo4jGraphStore:
"""Manages Neo4j graph database operations"""
def __init__(self, config: ConfigManager):
try:
self.driver = GraphDatabase.driver(
config.neo4j_config['uri'],
auth=(config.neo4j_config['user'], config.neo4j_config['password']),
database=config.neo4j_config['database']
)
# Test the connection
self.driver.verify_connectivity()
logger.info("Successfully connected to Neo4j database")
except Exception as e:
logger.error(f"Failed to connect to Neo4j: {str(e)}")
raise ValueError("Could not connect to Neo4j database. Please check your connection URI and credentials.")
self.setup_constraints()
def setup_constraints(self):
"""Set up database constraints and indexes"""
with self.driver.session() as session:
for node in NODES:
session.execute_write(set_uniquness_constraints, node)
logger.info(f"Set uniqueness constraint for {node}")
def add_founder(self, founder_name: str, namespace: str):
"""Add a founder node with namespace isolation"""
with self.driver.session() as session:
session.run(
"""
MERGE (f:Founder {name: $founder_name, namespace: $namespace})
SET f.created_at = datetime()
""",
founder_name=founder_name,
namespace=namespace
)
def add_entities(self, entities: List[Entity], founder_name: str, namespace: str):
"""Add entities to the graph with founder association"""
with self.driver.session() as session:
for entity in entities:
# Add the entity node
session.run(
f"""
MERGE (e:{entity.type} {{name: $name, namespace: $namespace}})
SET e.context = $context,
e.confidence = $confidence,
e.updated_at = datetime()
""",
name=entity.name,
namespace=namespace,
context=entity.context,
confidence=entity.confidence
)
# Link to founder
session.run(
f"""
MATCH (f:Founder {{name: $founder_name, namespace: $namespace}})
MATCH (e:{entity.type} {{name: $entity_name, namespace: $namespace}})
MERGE (f)-[:MENTIONS]->(e)
""",
founder_name=founder_name,
entity_name=entity.name,
namespace=namespace
)
def add_relationships(self, relationships: List[Relationship], namespace: str):
"""Add relationships between entities"""
with self.driver.session() as session:
for rel in relationships:
session.run(
"""
MATCH (a {name: $source, namespace: $namespace})
MATCH (b {name: $target, namespace: $namespace})
CALL apoc.create.relationship(a, $relation_type, {
context: $context,
confidence: $confidence,
created_at: datetime()
}, b) YIELD rel
RETURN rel
""",
source=rel.source,
target=rel.target,
relation_type=rel.relation_type,
context=rel.context,
confidence=rel.confidence,
namespace=namespace
)
def query_entities_by_founder(self, founder_name: str, namespace: str, entity_type: str = None) -> List[Dict]:
"""Query entities associated with a specific founder"""
with self.driver.session() as session:
if entity_type:
query = f"""
MATCH (f:Founder {{name: $founder_name, namespace: $namespace}})-[:MENTIONS]->(e:{entity_type})
RETURN e.name as name, e.context as context, e.confidence as confidence
"""
else:
query = """
MATCH (f:Founder {name: $founder_name, namespace: $namespace})-[:MENTIONS]->(e)
RETURN labels(e)[0] as type, e.name as name, e.context as context, e.confidence as confidence
"""
result = session.run(query, founder_name=founder_name, namespace=namespace)
return [record.data() for record in result]
def query_relationships(self, entity_name: str, namespace: str) -> List[Dict]:
"""Query relationships for a specific entity"""
with self.driver.session() as session:
result = session.run(
"""
MATCH (a {name: $entity_name, namespace: $namespace})-[r]->(b)
RETURN a.name as source, type(r) as relation, b.name as target, r.context as context
UNION
MATCH (a)-[r]->(b {name: $entity_name, namespace: $namespace})
RETURN a.name as source, type(r) as relation, b.name as target, r.context as context
""",
entity_name=entity_name,
namespace=namespace
)
return [record.data() for record in result]
def close(self):
"""Close database connection"""
self.driver.close()
class GraphRAGSystem:
"""Main system that orchestrates graph and vector operations"""
def __init__(self, config: ConfigManager):
self.config = config
self.graph_store = Neo4jGraphStore(config)
self.vector_store = PineconeVectorStore(config)
self.transcript_processor = TranscriptProcessor()
self.entity_extractor = EntityExtractor(config)
self.qa_llm = ChatOpenAI(
temperature=0.2,
openai_api_key=config.openai_api_key,
model_name="gpt-4"
)
def process_transcript_file(self, file_path: str, founder_name: str):
"""Process a complete transcript file for a founder"""
logger.info(f"Processing transcript file for {founder_name}")
# Create namespace for this founder
namespace = founder_name.lower().replace(' ', '_')
# Add founder to graph
self.graph_store.add_founder(founder_name, namespace)
# Load and process transcript
transcript_rows = self.transcript_processor.load_csv_transcript(file_path)
documents = [self.transcript_processor.process_transcript_row(row) for row in transcript_rows]
# Chunk documents for better processing
chunked_docs = self.transcript_processor.chunk_documents(documents)
# Add to vector store with optimized batching
# Process in smaller batches to avoid context length issues
batch_size = 32 # Smaller batch size for upserts
embedding_chunk_size = 100 # Smaller chunks for embeddings to avoid context length issues
for i in range(0, len(chunked_docs), embedding_chunk_size):
batch_docs = chunked_docs[i:i + embedding_chunk_size]
logger.info(f"Processing batch {i//embedding_chunk_size + 1} of {len(chunked_docs)//embedding_chunk_size + 1}")
self.vector_store.add_documents(
batch_docs,
namespace=namespace,
batch_size=batch_size
)
# Extract entities and relationships from each chunk
all_entities = []
all_relationships = []
for doc in chunked_docs:
entities, relationships = self.entity_extractor.extract_from_text(
doc.page_content,
founder_name
)
all_entities.extend(entities)
all_relationships.extend(relationships)
# Add to graph store
if all_entities:
self.graph_store.add_entities(all_entities, founder_name, namespace)
if all_relationships:
self.graph_store.add_relationships(all_relationships, namespace)
logger.info(f"Processed {len(documents)} transcript entries, extracted {len(all_entities)} entities and {len(all_relationships)} relationships")
def hybrid_search(self, query: str, founder_name: str, k: int = 5) -> Dict[str, Any]:
"""Perform hybrid search combining graph and vector results"""
namespace = founder_name.lower().replace(' ', '_')
# Vector search for semantic similarity
vector_results = self.vector_store.similarity_search(query, namespace, k)
# Graph search for entity-based queries
# First, try to identify entities in the query
query_entities, _ = self.entity_extractor.extract_from_text(query, founder_name)
graph_results = []
for entity in query_entities:
# Get related entities and relationships
related_entities = self.graph_store.query_entities_by_founder(founder_name, namespace)
relationships = self.graph_store.query_relationships(entity.name, namespace)
graph_results.extend(related_entities)
graph_results.extend(relationships)
return {
'vector_results': vector_results,
'graph_results': graph_results,
'query_entities': query_entities
}
def answer_question(self, question: str, founder_name: str) -> str:
"""Generate answer using hybrid GraphRAG approach"""
# Get hybrid search results
search_results = self.hybrid_search(question, founder_name)
# Prepare context from both sources
context_parts = []
# Add vector search context
for doc in search_results['vector_results']:
context_parts.append(f"Transcript: {doc.page_content}")
# Add graph context
for result in search_results['graph_results']:
if 'name' in result and 'context' in result:
context_parts.append(f"Entity: {result['name']} - {result['context']}")
elif 'source' in result and 'relation' in result:
context_parts.append(f"Relationship: {result['source']} {result['relation']} {result['target']}")
# Combine context
combined_context = "\n\n".join(context_parts[:10]) # Limit context size
# Generate answer using LLM with enhanced context
prompt = f"""
Based on the following information about {founder_name}'s meetings and business activities, answer the question comprehensively.
Context from transcripts and knowledge graph:
{combined_context}
Question: {question}
Provide a detailed answer based on the available information. If the information is insufficient, say so clearly.
"""
return self.qa_llm.predict(prompt)
def main():
config = ConfigManager()
system = GraphRAGSystem(config)
# Example: Process a transcript file
founder_name = "Ronak"
transcript_file = "adaptive-anand.csv" # Your CSV file path
try:
# Process the transcript
# system.process_transcript_file(transcript_file, founder_name)
# Ask questions
questions = [
"What challenges did the founder discuss?",
"Who are the key people mentioned in the meetings?",
"What companies were discussed?",
"What strategies were mentioned for growth?"
]
for question in questions:
print(f"\nQuestion: {question}")
answer = system.answer_question(question, founder_name)
print(f"Answer: {answer}")
print("-" * 80)
except Exception as e:
logger.error(f"Error in main execution: {e}")
finally:
# Clean up connections
system.graph_store.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment