import os import csv import logging import uuid import json from typing import Dict, List, Tuple, Any from retry import retry from dotenv import load_dotenv from neo4j import GraphDatabase import pandas as pd from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.documents import Document from langchain_core.prompts import PromptTemplate from pinecone import Pinecone, ServerlessSpec import openai from dataclasses import dataclass # Load environment variables load_dotenv(dotenv_path='.env') openai.api_key = os.getenv('OPENAI_API_KEY') # Set up logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) NODES = [ "Person", "Company", "Topic", "Founder" ] @dataclass class Entity: """Represents an extracted entity from transcript text""" name: str type: str # PERSON, COMPANY, TOPIC, etc. context: str confidence: float = 0.8 @dataclass class Relationship: """Represents a relationship between two entities""" source: str target: str relation_type: str context: str confidence: float = 0.8 class ConfigManager: """Manages configuration and API keys""" def __init__(self): self.required_keys = [ 'OPENAI_API_KEY', 'PINECONE_API_KEY', 'PINECONE_HOST', 'NEO4J_URI', 'NEO4J_USERNAME', 'NEO4J_PASSWORD' ] self.validate_environment() def validate_environment(self): """Ensure all required environment variables are set""" missing_keys = [key for key in self.required_keys if not os.getenv(key)] if missing_keys: raise ValueError(f"Missing required environment variables: {missing_keys}") @property def openai_api_key(self) -> str: return os.getenv('OPENAI_API_KEY') @property def pinecone_config(self) -> Dict[str, str]: return { 'api_key': os.getenv('PINECONE_API_KEY'), 'host': os.getenv('PINECONE_HOST') } @property def neo4j_config(self) -> Dict[str, str]: return { 'uri': os.getenv('NEO4J_URI'), 'user': os.getenv('NEO4J_USERNAME'), 'password': os.getenv('NEO4J_PASSWORD'), 'database': os.getenv('NEO4J_DATABASE') } @retry(tries=10, delay=10) def set_uniquness_constraints(tx, node): query = f"""CREATE CONSTRAINT IF NOT EXISTS FOR (n:{node}) REQUIRE n.id IS UNIQUE;""" _ = tx.run(query) class EntityExtractor: """Extracts entities and relationships from transcript text using LLM""" def __init__(self, config: ConfigManager): self.llm = ChatOpenAI( temperature=0.1, openai_api_key=config.openai_api_key, model_name="gpt-4" ) # Prompt template for entity extraction self.entity_prompt = PromptTemplate( input_variables=["text", "founder_name"], template=""" You are an expert at extracting structured information from business meeting transcripts. Founder: {founder_name} From the following transcript text, extract: 1. PEOPLE mentioned (names, roles, companies they work for) 2. COMPANIES mentioned 3. TOPICS discussed (products, strategies, challenges, opportunities) 4. RELATIONSHIPS between entities (who works with whom, what topics relate to what companies, etc.) Text: {text} Return your response as valid JSON with this structure: {{ "entities": [ {{"name": "entity_name", "type": "PERSON|COMPANY|TOPIC", "context": "relevant_context", "confidence": 0.9}} ], "relationships": [ {{"source": "entity1", "target": "entity2", "relation_type": "WORKS_WITH|DISCUSSES|LEADS|etc", "context": "context", "confidence": 0.8}} ] }} Focus on business-relevant information. Be precise with entity names and relationship types. """ ) def extract_from_text(self, text: str, founder_name: str) -> Tuple[List[Entity], List[Relationship]]: """Extract entities and relationships from a text chunk""" try: # Get LLM response prompt = self.entity_prompt.format(text=text, founder_name=founder_name) response = self.llm.predict(prompt) # Parse JSON response data = json.loads(response) # Convert to our data structures entities = [Entity(**entity) for entity in data.get('entities', [])] relationships = [Relationship(**rel) for rel in data.get('relationships', [])] return entities, relationships except Exception as e: logger.error(f"Error extracting entities: {e}") return [], [] class PineconeVectorStore: """Manages Pinecone vector storage operations""" def __init__(self, config: ConfigManager, index_name: str = "founder-transcripts"): # Initialize Pinecone self.pc = Pinecone( api_key=config.pinecone_config['api_key'] ) self.embeddings = OpenAIEmbeddings( openai_api_key=config.openai_api_key, model="text-embedding-3-large", dimensions=3072 ) if not self.pc.has_index(index_name): self.index = self.pc.create_index( name=index_name, dimension=3072, metric="cosine", spec=ServerlessSpec( cloud='aws', region='us-east-1' ) ) self.index = self.pc.Index(index_name) def add_documents(self, documents: List[Document], namespace: str = 'ns1', batch_size: int = 32): """Add documents to vector store with namespace""" doc_embeds = self.embed([doc.page_content for doc in documents]) vectors = [] for doc, embed in zip(documents, doc_embeds): metadata = doc.metadata.copy() if doc.metadata else {} metadata['text'] = doc.page_content vectors.append({ "id": str(hash(doc.page_content)), "values": embed, "metadata": metadata }) self.index.upsert( vectors=vectors, namespace=namespace, batch_size=batch_size ) def similarity_search(self, query: str, namespace: str = 'ns1', k: int = 5): """Search for similar documents within a namespace""" query_embedding = self.embed([query])[0] return self.index.query( namespace=namespace, vector=query_embedding, top_k=k, include_values=False, include_metadata=True ) def embed(self, texts: list[str]) -> list[list[float]]: """Embed texts using OpenAI's embedding model""" res = openai.embeddings.create( input=texts, model="text-embedding-3-large" ) return [r.embedding for r in res.data] class TranscriptProcessor: """Processes transcript files and chunks them appropriately""" def __init__(self): self.text_splitter = RecursiveCharacterTextSplitter( separators=[ "\n\n", "\n", " ", ".", ",", "\u200b", # Zero-width space "\uff0c", # Fullwidth comma "\u3001", # Ideographic comma "\uff0e", # Fullwidth full stop "\u3002", # Ideographic full stop "", ], chunk_size=1000, chunk_overlap=0, length_function=len, ) def load_csv_transcript(self, file_path: str) -> List[Dict]: """Load transcript from CSV file""" transcripts = [] with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for row in reader: transcripts.append(row) return transcripts def process_transcript_row(self, row: Dict) -> Document: """Convert a transcript row to a Document object""" # Combine speaker and text for full context content = f"Speaker: {row.get('speaker', 'Unknown')}\nText: {row.get('text', '')}" metadata = { 'speaker': row.get('speaker', 'Unknown'), 'timestamp': row.get('timestamp', ''), 'meeting_id': row.get('meeting_id', str(uuid.uuid4())), 'source': 'transcript' } return Document(page_content=content, metadata=metadata) def chunk_documents(self, documents: List[Document]) -> List[Document]: """Split documents into smaller chunks for better processing""" return self.text_splitter.split_documents(documents) class Neo4jGraphStore: """Manages Neo4j graph database operations""" def __init__(self, config: ConfigManager): try: self.driver = GraphDatabase.driver( config.neo4j_config['uri'], auth=(config.neo4j_config['user'], config.neo4j_config['password']), database=config.neo4j_config['database'] ) # Test the connection self.driver.verify_connectivity() logger.info("Successfully connected to Neo4j database") except Exception as e: logger.error(f"Failed to connect to Neo4j: {str(e)}") raise ValueError("Could not connect to Neo4j database. Please check your connection URI and credentials.") self.setup_constraints() def setup_constraints(self): """Set up database constraints and indexes""" with self.driver.session() as session: for node in NODES: session.execute_write(set_uniquness_constraints, node) logger.info(f"Set uniqueness constraint for {node}") def add_founder(self, founder_name: str, namespace: str): """Add a founder node with namespace isolation""" with self.driver.session() as session: session.run( """ MERGE (f:Founder {name: $founder_name, namespace: $namespace}) SET f.created_at = datetime() """, founder_name=founder_name, namespace=namespace ) def add_entities(self, entities: List[Entity], founder_name: str, namespace: str): """Add entities to the graph with founder association""" with self.driver.session() as session: for entity in entities: # Add the entity node session.run( f""" MERGE (e:{entity.type} {{name: $name, namespace: $namespace}}) SET e.context = $context, e.confidence = $confidence, e.updated_at = datetime() """, name=entity.name, namespace=namespace, context=entity.context, confidence=entity.confidence ) # Link to founder session.run( f""" MATCH (f:Founder {{name: $founder_name, namespace: $namespace}}) MATCH (e:{entity.type} {{name: $entity_name, namespace: $namespace}}) MERGE (f)-[:MENTIONS]->(e) """, founder_name=founder_name, entity_name=entity.name, namespace=namespace ) def add_relationships(self, relationships: List[Relationship], namespace: str): """Add relationships between entities""" with self.driver.session() as session: for rel in relationships: session.run( """ MATCH (a {name: $source, namespace: $namespace}) MATCH (b {name: $target, namespace: $namespace}) CALL apoc.create.relationship(a, $relation_type, { context: $context, confidence: $confidence, created_at: datetime() }, b) YIELD rel RETURN rel """, source=rel.source, target=rel.target, relation_type=rel.relation_type, context=rel.context, confidence=rel.confidence, namespace=namespace ) def query_entities_by_founder(self, founder_name: str, namespace: str, entity_type: str = None) -> List[Dict]: """Query entities associated with a specific founder""" with self.driver.session() as session: if entity_type: query = f""" MATCH (f:Founder {{name: $founder_name, namespace: $namespace}})-[:MENTIONS]->(e:{entity_type}) RETURN e.name as name, e.context as context, e.confidence as confidence """ else: query = """ MATCH (f:Founder {name: $founder_name, namespace: $namespace})-[:MENTIONS]->(e) RETURN labels(e)[0] as type, e.name as name, e.context as context, e.confidence as confidence """ result = session.run(query, founder_name=founder_name, namespace=namespace) return [record.data() for record in result] def query_relationships(self, entity_name: str, namespace: str) -> List[Dict]: """Query relationships for a specific entity""" with self.driver.session() as session: result = session.run( """ MATCH (a {name: $entity_name, namespace: $namespace})-[r]->(b) RETURN a.name as source, type(r) as relation, b.name as target, r.context as context UNION MATCH (a)-[r]->(b {name: $entity_name, namespace: $namespace}) RETURN a.name as source, type(r) as relation, b.name as target, r.context as context """, entity_name=entity_name, namespace=namespace ) return [record.data() for record in result] def close(self): """Close database connection""" self.driver.close() class GraphRAGSystem: """Main system that orchestrates graph and vector operations""" def __init__(self, config: ConfigManager): self.config = config self.graph_store = Neo4jGraphStore(config) self.vector_store = PineconeVectorStore(config) self.transcript_processor = TranscriptProcessor() self.entity_extractor = EntityExtractor(config) self.qa_llm = ChatOpenAI( temperature=0.2, openai_api_key=config.openai_api_key, model_name="gpt-4" ) def process_transcript_file(self, file_path: str, founder_name: str): """Process a complete transcript file for a founder""" logger.info(f"Processing transcript file for {founder_name}") # Create namespace for this founder namespace = founder_name.lower().replace(' ', '_') # Add founder to graph self.graph_store.add_founder(founder_name, namespace) # Load and process transcript transcript_rows = self.transcript_processor.load_csv_transcript(file_path) documents = [self.transcript_processor.process_transcript_row(row) for row in transcript_rows] # Chunk documents for better processing chunked_docs = self.transcript_processor.chunk_documents(documents) # Add to vector store with optimized batching # Process in smaller batches to avoid context length issues batch_size = 32 # Smaller batch size for upserts embedding_chunk_size = 100 # Smaller chunks for embeddings to avoid context length issues for i in range(0, len(chunked_docs), embedding_chunk_size): batch_docs = chunked_docs[i:i + embedding_chunk_size] logger.info(f"Processing batch {i//embedding_chunk_size + 1} of {len(chunked_docs)//embedding_chunk_size + 1}") self.vector_store.add_documents( batch_docs, namespace=namespace, batch_size=batch_size ) # Extract entities and relationships from each chunk all_entities = [] all_relationships = [] for doc in chunked_docs: entities, relationships = self.entity_extractor.extract_from_text( doc.page_content, founder_name ) all_entities.extend(entities) all_relationships.extend(relationships) # Add to graph store if all_entities: self.graph_store.add_entities(all_entities, founder_name, namespace) if all_relationships: self.graph_store.add_relationships(all_relationships, namespace) logger.info(f"Processed {len(documents)} transcript entries, extracted {len(all_entities)} entities and {len(all_relationships)} relationships") def hybrid_search(self, query: str, founder_name: str, k: int = 5) -> Dict[str, Any]: """Perform hybrid search combining graph and vector results""" namespace = founder_name.lower().replace(' ', '_') # Vector search for semantic similarity vector_results = self.vector_store.similarity_search(query, namespace, k) # Graph search for entity-based queries # First, try to identify entities in the query query_entities, _ = self.entity_extractor.extract_from_text(query, founder_name) graph_results = [] for entity in query_entities: # Get related entities and relationships related_entities = self.graph_store.query_entities_by_founder(founder_name, namespace) relationships = self.graph_store.query_relationships(entity.name, namespace) graph_results.extend(related_entities) graph_results.extend(relationships) return { 'vector_results': vector_results, 'graph_results': graph_results, 'query_entities': query_entities } def answer_question(self, question: str, founder_name: str) -> str: """Generate answer using hybrid GraphRAG approach""" # Get hybrid search results search_results = self.hybrid_search(question, founder_name) # Prepare context from both sources context_parts = [] # Add vector search context for doc in search_results['vector_results']: context_parts.append(f"Transcript: {doc.page_content}") # Add graph context for result in search_results['graph_results']: if 'name' in result and 'context' in result: context_parts.append(f"Entity: {result['name']} - {result['context']}") elif 'source' in result and 'relation' in result: context_parts.append(f"Relationship: {result['source']} {result['relation']} {result['target']}") # Combine context combined_context = "\n\n".join(context_parts[:10]) # Limit context size # Generate answer using LLM with enhanced context prompt = f""" Based on the following information about {founder_name}'s meetings and business activities, answer the question comprehensively. Context from transcripts and knowledge graph: {combined_context} Question: {question} Provide a detailed answer based on the available information. If the information is insufficient, say so clearly. """ return self.qa_llm.predict(prompt) def main(): config = ConfigManager() system = GraphRAGSystem(config) # Example: Process a transcript file founder_name = "Ronak" transcript_file = "adaptive-anand.csv" # Your CSV file path try: # Process the transcript # system.process_transcript_file(transcript_file, founder_name) # Ask questions questions = [ "What challenges did the founder discuss?", "Who are the key people mentioned in the meetings?", "What companies were discussed?", "What strategies were mentioned for growth?" ] for question in questions: print(f"\nQuestion: {question}") answer = system.answer_question(question, founder_name) print(f"Answer: {answer}") print("-" * 80) except Exception as e: logger.error(f"Error in main execution: {e}") finally: # Clean up connections system.graph_store.close() if __name__ == "__main__": main()