Created
May 29, 2025 15:16
-
-
Save deepakkashyap3013/2f898f52ee30632ddc6a5cfdefff24be to your computer and use it in GitHub Desktop.
GraphRag with Neo4j + Pinecone vector DB (Hybrid solution)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import csv | |
| import logging | |
| import uuid | |
| import json | |
| from typing import Dict, List, Tuple, Any | |
| from retry import retry | |
| from dotenv import load_dotenv | |
| from neo4j import GraphDatabase | |
| import pandas as pd | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from langchain_core.prompts import PromptTemplate | |
| from pinecone import Pinecone, ServerlessSpec | |
| import openai | |
| from dataclasses import dataclass | |
| # Load environment variables | |
| load_dotenv(dotenv_path='.env') | |
| openai.api_key = os.getenv('OPENAI_API_KEY') | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| NODES = [ | |
| "Person", "Company", "Topic", "Founder" | |
| ] | |
| @dataclass | |
| class Entity: | |
| """Represents an extracted entity from transcript text""" | |
| name: str | |
| type: str # PERSON, COMPANY, TOPIC, etc. | |
| context: str | |
| confidence: float = 0.8 | |
| @dataclass | |
| class Relationship: | |
| """Represents a relationship between two entities""" | |
| source: str | |
| target: str | |
| relation_type: str | |
| context: str | |
| confidence: float = 0.8 | |
| class ConfigManager: | |
| """Manages configuration and API keys""" | |
| def __init__(self): | |
| self.required_keys = [ | |
| 'OPENAI_API_KEY', | |
| 'PINECONE_API_KEY', | |
| 'PINECONE_HOST', | |
| 'NEO4J_URI', | |
| 'NEO4J_USERNAME', | |
| 'NEO4J_PASSWORD' | |
| ] | |
| self.validate_environment() | |
| def validate_environment(self): | |
| """Ensure all required environment variables are set""" | |
| missing_keys = [key for key in self.required_keys if not os.getenv(key)] | |
| if missing_keys: | |
| raise ValueError(f"Missing required environment variables: {missing_keys}") | |
| @property | |
| def openai_api_key(self) -> str: | |
| return os.getenv('OPENAI_API_KEY') | |
| @property | |
| def pinecone_config(self) -> Dict[str, str]: | |
| return { | |
| 'api_key': os.getenv('PINECONE_API_KEY'), | |
| 'host': os.getenv('PINECONE_HOST') | |
| } | |
| @property | |
| def neo4j_config(self) -> Dict[str, str]: | |
| return { | |
| 'uri': os.getenv('NEO4J_URI'), | |
| 'user': os.getenv('NEO4J_USERNAME'), | |
| 'password': os.getenv('NEO4J_PASSWORD'), | |
| 'database': os.getenv('NEO4J_DATABASE') | |
| } | |
| @retry(tries=10, delay=10) | |
| def set_uniquness_constraints(tx, node): | |
| query = f"""CREATE CONSTRAINT IF NOT EXISTS FOR | |
| (n:{node}) REQUIRE n.id IS UNIQUE;""" | |
| _ = tx.run(query) | |
| class EntityExtractor: | |
| """Extracts entities and relationships from transcript text using LLM""" | |
| def __init__(self, config: ConfigManager): | |
| self.llm = ChatOpenAI( | |
| temperature=0.1, | |
| openai_api_key=config.openai_api_key, | |
| model_name="gpt-4" | |
| ) | |
| # Prompt template for entity extraction | |
| self.entity_prompt = PromptTemplate( | |
| input_variables=["text", "founder_name"], | |
| template=""" | |
| You are an expert at extracting structured information from business meeting transcripts. | |
| Founder: {founder_name} | |
| From the following transcript text, extract: | |
| 1. PEOPLE mentioned (names, roles, companies they work for) | |
| 2. COMPANIES mentioned | |
| 3. TOPICS discussed (products, strategies, challenges, opportunities) | |
| 4. RELATIONSHIPS between entities (who works with whom, what topics relate to what companies, etc.) | |
| Text: {text} | |
| Return your response as valid JSON with this structure: | |
| {{ | |
| "entities": [ | |
| {{"name": "entity_name", "type": "PERSON|COMPANY|TOPIC", "context": "relevant_context", "confidence": 0.9}} | |
| ], | |
| "relationships": [ | |
| {{"source": "entity1", "target": "entity2", "relation_type": "WORKS_WITH|DISCUSSES|LEADS|etc", "context": "context", "confidence": 0.8}} | |
| ] | |
| }} | |
| Focus on business-relevant information. Be precise with entity names and relationship types. | |
| """ | |
| ) | |
| def extract_from_text(self, text: str, founder_name: str) -> Tuple[List[Entity], List[Relationship]]: | |
| """Extract entities and relationships from a text chunk""" | |
| try: | |
| # Get LLM response | |
| prompt = self.entity_prompt.format(text=text, founder_name=founder_name) | |
| response = self.llm.predict(prompt) | |
| # Parse JSON response | |
| data = json.loads(response) | |
| # Convert to our data structures | |
| entities = [Entity(**entity) for entity in data.get('entities', [])] | |
| relationships = [Relationship(**rel) for rel in data.get('relationships', [])] | |
| return entities, relationships | |
| except Exception as e: | |
| logger.error(f"Error extracting entities: {e}") | |
| return [], [] | |
| class PineconeVectorStore: | |
| """Manages Pinecone vector storage operations""" | |
| def __init__(self, config: ConfigManager, index_name: str = "founder-transcripts"): | |
| # Initialize Pinecone | |
| self.pc = Pinecone( | |
| api_key=config.pinecone_config['api_key'] | |
| ) | |
| self.embeddings = OpenAIEmbeddings( | |
| openai_api_key=config.openai_api_key, | |
| model="text-embedding-3-large", | |
| dimensions=3072 | |
| ) | |
| if not self.pc.has_index(index_name): | |
| self.index = self.pc.create_index( | |
| name=index_name, | |
| dimension=3072, | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud='aws', | |
| region='us-east-1' | |
| ) | |
| ) | |
| self.index = self.pc.Index(index_name) | |
| def add_documents(self, documents: List[Document], namespace: str = 'ns1', batch_size: int = 32): | |
| """Add documents to vector store with namespace""" | |
| doc_embeds = self.embed([doc.page_content for doc in documents]) | |
| vectors = [] | |
| for doc, embed in zip(documents, doc_embeds): | |
| metadata = doc.metadata.copy() if doc.metadata else {} | |
| metadata['text'] = doc.page_content | |
| vectors.append({ | |
| "id": str(hash(doc.page_content)), | |
| "values": embed, | |
| "metadata": metadata | |
| }) | |
| self.index.upsert( | |
| vectors=vectors, | |
| namespace=namespace, | |
| batch_size=batch_size | |
| ) | |
| def similarity_search(self, query: str, namespace: str = 'ns1', k: int = 5): | |
| """Search for similar documents within a namespace""" | |
| query_embedding = self.embed([query])[0] | |
| return self.index.query( | |
| namespace=namespace, | |
| vector=query_embedding, | |
| top_k=k, | |
| include_values=False, | |
| include_metadata=True | |
| ) | |
| def embed(self, texts: list[str]) -> list[list[float]]: | |
| """Embed texts using OpenAI's embedding model""" | |
| res = openai.embeddings.create( | |
| input=texts, | |
| model="text-embedding-3-large" | |
| ) | |
| return [r.embedding for r in res.data] | |
| class TranscriptProcessor: | |
| """Processes transcript files and chunks them appropriately""" | |
| def __init__(self): | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| separators=[ | |
| "\n\n", | |
| "\n", | |
| " ", | |
| ".", | |
| ",", | |
| "\u200b", # Zero-width space | |
| "\uff0c", # Fullwidth comma | |
| "\u3001", # Ideographic comma | |
| "\uff0e", # Fullwidth full stop | |
| "\u3002", # Ideographic full stop | |
| "", | |
| ], | |
| chunk_size=1000, | |
| chunk_overlap=0, | |
| length_function=len, | |
| ) | |
| def load_csv_transcript(self, file_path: str) -> List[Dict]: | |
| """Load transcript from CSV file""" | |
| transcripts = [] | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| reader = csv.DictReader(file) | |
| for row in reader: | |
| transcripts.append(row) | |
| return transcripts | |
| def process_transcript_row(self, row: Dict) -> Document: | |
| """Convert a transcript row to a Document object""" | |
| # Combine speaker and text for full context | |
| content = f"Speaker: {row.get('speaker', 'Unknown')}\nText: {row.get('text', '')}" | |
| metadata = { | |
| 'speaker': row.get('speaker', 'Unknown'), | |
| 'timestamp': row.get('timestamp', ''), | |
| 'meeting_id': row.get('meeting_id', str(uuid.uuid4())), | |
| 'source': 'transcript' | |
| } | |
| return Document(page_content=content, metadata=metadata) | |
| def chunk_documents(self, documents: List[Document]) -> List[Document]: | |
| """Split documents into smaller chunks for better processing""" | |
| return self.text_splitter.split_documents(documents) | |
| class Neo4jGraphStore: | |
| """Manages Neo4j graph database operations""" | |
| def __init__(self, config: ConfigManager): | |
| try: | |
| self.driver = GraphDatabase.driver( | |
| config.neo4j_config['uri'], | |
| auth=(config.neo4j_config['user'], config.neo4j_config['password']), | |
| database=config.neo4j_config['database'] | |
| ) | |
| # Test the connection | |
| self.driver.verify_connectivity() | |
| logger.info("Successfully connected to Neo4j database") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Neo4j: {str(e)}") | |
| raise ValueError("Could not connect to Neo4j database. Please check your connection URI and credentials.") | |
| self.setup_constraints() | |
| def setup_constraints(self): | |
| """Set up database constraints and indexes""" | |
| with self.driver.session() as session: | |
| for node in NODES: | |
| session.execute_write(set_uniquness_constraints, node) | |
| logger.info(f"Set uniqueness constraint for {node}") | |
| def add_founder(self, founder_name: str, namespace: str): | |
| """Add a founder node with namespace isolation""" | |
| with self.driver.session() as session: | |
| session.run( | |
| """ | |
| MERGE (f:Founder {name: $founder_name, namespace: $namespace}) | |
| SET f.created_at = datetime() | |
| """, | |
| founder_name=founder_name, | |
| namespace=namespace | |
| ) | |
| def add_entities(self, entities: List[Entity], founder_name: str, namespace: str): | |
| """Add entities to the graph with founder association""" | |
| with self.driver.session() as session: | |
| for entity in entities: | |
| # Add the entity node | |
| session.run( | |
| f""" | |
| MERGE (e:{entity.type} {{name: $name, namespace: $namespace}}) | |
| SET e.context = $context, | |
| e.confidence = $confidence, | |
| e.updated_at = datetime() | |
| """, | |
| name=entity.name, | |
| namespace=namespace, | |
| context=entity.context, | |
| confidence=entity.confidence | |
| ) | |
| # Link to founder | |
| session.run( | |
| f""" | |
| MATCH (f:Founder {{name: $founder_name, namespace: $namespace}}) | |
| MATCH (e:{entity.type} {{name: $entity_name, namespace: $namespace}}) | |
| MERGE (f)-[:MENTIONS]->(e) | |
| """, | |
| founder_name=founder_name, | |
| entity_name=entity.name, | |
| namespace=namespace | |
| ) | |
| def add_relationships(self, relationships: List[Relationship], namespace: str): | |
| """Add relationships between entities""" | |
| with self.driver.session() as session: | |
| for rel in relationships: | |
| session.run( | |
| """ | |
| MATCH (a {name: $source, namespace: $namespace}) | |
| MATCH (b {name: $target, namespace: $namespace}) | |
| CALL apoc.create.relationship(a, $relation_type, { | |
| context: $context, | |
| confidence: $confidence, | |
| created_at: datetime() | |
| }, b) YIELD rel | |
| RETURN rel | |
| """, | |
| source=rel.source, | |
| target=rel.target, | |
| relation_type=rel.relation_type, | |
| context=rel.context, | |
| confidence=rel.confidence, | |
| namespace=namespace | |
| ) | |
| def query_entities_by_founder(self, founder_name: str, namespace: str, entity_type: str = None) -> List[Dict]: | |
| """Query entities associated with a specific founder""" | |
| with self.driver.session() as session: | |
| if entity_type: | |
| query = f""" | |
| MATCH (f:Founder {{name: $founder_name, namespace: $namespace}})-[:MENTIONS]->(e:{entity_type}) | |
| RETURN e.name as name, e.context as context, e.confidence as confidence | |
| """ | |
| else: | |
| query = """ | |
| MATCH (f:Founder {name: $founder_name, namespace: $namespace})-[:MENTIONS]->(e) | |
| RETURN labels(e)[0] as type, e.name as name, e.context as context, e.confidence as confidence | |
| """ | |
| result = session.run(query, founder_name=founder_name, namespace=namespace) | |
| return [record.data() for record in result] | |
| def query_relationships(self, entity_name: str, namespace: str) -> List[Dict]: | |
| """Query relationships for a specific entity""" | |
| with self.driver.session() as session: | |
| result = session.run( | |
| """ | |
| MATCH (a {name: $entity_name, namespace: $namespace})-[r]->(b) | |
| RETURN a.name as source, type(r) as relation, b.name as target, r.context as context | |
| UNION | |
| MATCH (a)-[r]->(b {name: $entity_name, namespace: $namespace}) | |
| RETURN a.name as source, type(r) as relation, b.name as target, r.context as context | |
| """, | |
| entity_name=entity_name, | |
| namespace=namespace | |
| ) | |
| return [record.data() for record in result] | |
| def close(self): | |
| """Close database connection""" | |
| self.driver.close() | |
| class GraphRAGSystem: | |
| """Main system that orchestrates graph and vector operations""" | |
| def __init__(self, config: ConfigManager): | |
| self.config = config | |
| self.graph_store = Neo4jGraphStore(config) | |
| self.vector_store = PineconeVectorStore(config) | |
| self.transcript_processor = TranscriptProcessor() | |
| self.entity_extractor = EntityExtractor(config) | |
| self.qa_llm = ChatOpenAI( | |
| temperature=0.2, | |
| openai_api_key=config.openai_api_key, | |
| model_name="gpt-4" | |
| ) | |
| def process_transcript_file(self, file_path: str, founder_name: str): | |
| """Process a complete transcript file for a founder""" | |
| logger.info(f"Processing transcript file for {founder_name}") | |
| # Create namespace for this founder | |
| namespace = founder_name.lower().replace(' ', '_') | |
| # Add founder to graph | |
| self.graph_store.add_founder(founder_name, namespace) | |
| # Load and process transcript | |
| transcript_rows = self.transcript_processor.load_csv_transcript(file_path) | |
| documents = [self.transcript_processor.process_transcript_row(row) for row in transcript_rows] | |
| # Chunk documents for better processing | |
| chunked_docs = self.transcript_processor.chunk_documents(documents) | |
| # Add to vector store with optimized batching | |
| # Process in smaller batches to avoid context length issues | |
| batch_size = 32 # Smaller batch size for upserts | |
| embedding_chunk_size = 100 # Smaller chunks for embeddings to avoid context length issues | |
| for i in range(0, len(chunked_docs), embedding_chunk_size): | |
| batch_docs = chunked_docs[i:i + embedding_chunk_size] | |
| logger.info(f"Processing batch {i//embedding_chunk_size + 1} of {len(chunked_docs)//embedding_chunk_size + 1}") | |
| self.vector_store.add_documents( | |
| batch_docs, | |
| namespace=namespace, | |
| batch_size=batch_size | |
| ) | |
| # Extract entities and relationships from each chunk | |
| all_entities = [] | |
| all_relationships = [] | |
| for doc in chunked_docs: | |
| entities, relationships = self.entity_extractor.extract_from_text( | |
| doc.page_content, | |
| founder_name | |
| ) | |
| all_entities.extend(entities) | |
| all_relationships.extend(relationships) | |
| # Add to graph store | |
| if all_entities: | |
| self.graph_store.add_entities(all_entities, founder_name, namespace) | |
| if all_relationships: | |
| self.graph_store.add_relationships(all_relationships, namespace) | |
| logger.info(f"Processed {len(documents)} transcript entries, extracted {len(all_entities)} entities and {len(all_relationships)} relationships") | |
| def hybrid_search(self, query: str, founder_name: str, k: int = 5) -> Dict[str, Any]: | |
| """Perform hybrid search combining graph and vector results""" | |
| namespace = founder_name.lower().replace(' ', '_') | |
| # Vector search for semantic similarity | |
| vector_results = self.vector_store.similarity_search(query, namespace, k) | |
| # Graph search for entity-based queries | |
| # First, try to identify entities in the query | |
| query_entities, _ = self.entity_extractor.extract_from_text(query, founder_name) | |
| graph_results = [] | |
| for entity in query_entities: | |
| # Get related entities and relationships | |
| related_entities = self.graph_store.query_entities_by_founder(founder_name, namespace) | |
| relationships = self.graph_store.query_relationships(entity.name, namespace) | |
| graph_results.extend(related_entities) | |
| graph_results.extend(relationships) | |
| return { | |
| 'vector_results': vector_results, | |
| 'graph_results': graph_results, | |
| 'query_entities': query_entities | |
| } | |
| def answer_question(self, question: str, founder_name: str) -> str: | |
| """Generate answer using hybrid GraphRAG approach""" | |
| # Get hybrid search results | |
| search_results = self.hybrid_search(question, founder_name) | |
| # Prepare context from both sources | |
| context_parts = [] | |
| # Add vector search context | |
| for doc in search_results['vector_results']: | |
| context_parts.append(f"Transcript: {doc.page_content}") | |
| # Add graph context | |
| for result in search_results['graph_results']: | |
| if 'name' in result and 'context' in result: | |
| context_parts.append(f"Entity: {result['name']} - {result['context']}") | |
| elif 'source' in result and 'relation' in result: | |
| context_parts.append(f"Relationship: {result['source']} {result['relation']} {result['target']}") | |
| # Combine context | |
| combined_context = "\n\n".join(context_parts[:10]) # Limit context size | |
| # Generate answer using LLM with enhanced context | |
| prompt = f""" | |
| Based on the following information about {founder_name}'s meetings and business activities, answer the question comprehensively. | |
| Context from transcripts and knowledge graph: | |
| {combined_context} | |
| Question: {question} | |
| Provide a detailed answer based on the available information. If the information is insufficient, say so clearly. | |
| """ | |
| return self.qa_llm.predict(prompt) | |
| def main(): | |
| config = ConfigManager() | |
| system = GraphRAGSystem(config) | |
| # Example: Process a transcript file | |
| founder_name = "Ronak" | |
| transcript_file = "adaptive-anand.csv" # Your CSV file path | |
| try: | |
| # Process the transcript | |
| # system.process_transcript_file(transcript_file, founder_name) | |
| # Ask questions | |
| questions = [ | |
| "What challenges did the founder discuss?", | |
| "Who are the key people mentioned in the meetings?", | |
| "What companies were discussed?", | |
| "What strategies were mentioned for growth?" | |
| ] | |
| for question in questions: | |
| print(f"\nQuestion: {question}") | |
| answer = system.answer_question(question, founder_name) | |
| print(f"Answer: {answer}") | |
| print("-" * 80) | |
| except Exception as e: | |
| logger.error(f"Error in main execution: {e}") | |
| finally: | |
| # Clean up connections | |
| system.graph_store.close() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment