Created
July 27, 2025 18:25
-
-
Save tkersey/05f89b229a42610b1d378ee43b95de30 to your computer and use it in GitHub Desktop.
Revisions
-
tkersey created this gist
Jul 27, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,900 @@ # Deep Analysis: OpenAI Agents Python SDK Based on my comprehensive exploration of the codebase, here's how this SDK handles multi-agent systems: ## 1. Agent Composition Patterns The SDK supports several powerful composition patterns: ### a) Handoffs (Agent-to-Agent Delegation) - Agents can transfer control to other agents through the `handoffs` mechanism - Handoffs appear as tools to the LLM, allowing dynamic routing based on context - Example: A triage agent routing to language-specific agents (src/agents/handoffs.py:58-268) - Handoffs can include input filters to transform conversation history before passing control ### b) Agents as Tools - Any agent can be exposed as a tool to other agents via `agent.as_tool()` - This enables hierarchical agent structures where specialized agents are invoked for specific tasks - Example: Financial research agent using fundamentals and risk analysis agents as tools (examples/financial_research_agent/manager.py:102-112) ### c) Parallel Execution - Multiple agents can run concurrently using `asyncio.gather()` - Results can be aggregated by a coordinator agent - Example: Running multiple translation agents in parallel and selecting the best result (examples/agent_patterns/parallelization.py:26-39) ## 2. Communication Mechanisms ### a) Message Passing - Agents communicate through structured message items (`RunItem`) - Messages preserve full conversation history including tool calls and responses - Each agent receives the complete context when invoked ### b) Context Objects - Shared mutable context via `RunContextWrapper[TContext]` (src/agents/run_context.py:11-27) - Context is passed to all tools, guardrails, and hooks - Enables dependency injection and state sharing across agent components ### c) Streaming Events - Real-time communication through event streams - Events include agent starts/ends, handoffs, tool calls, and custom events - Enables reactive UI updates and monitoring ## 3. Execution Patterns ### a) Sequential Execution (Default) - Agent loop runs until final output is produced (src/agents/run.py:372-498) - Turn-based execution with configurable max_turns limit - Each turn: LLM call → Tool execution → Handoff/Continue/Finish ### b) Concurrent Tool Execution - Tools within a single agent run concurrently by default - Implemented using `asyncio.gather()` for parallel tool calls - Results are collected and passed back to the LLM ### c) Streaming Execution - Non-blocking execution with event streaming (src/agents/run.py:526-594) - Allows real-time UI updates while agents process - Maintains same execution semantics as non-streaming mode ## 4. State Management ### a) Session Memory - Built-in conversation persistence via `Session` interface - SQLite implementation provided, custom backends supported - Automatic history management across agent runs ### b) Agent State - Agents are stateless by design - state lives in context and messages - Each agent run creates new execution context - State persistence handled through sessions and context objects ### c) Tool Use Tracking - Tracks which tools agents have used to prevent infinite loops - Configurable tool choice reset behavior after tool calls ## 5. Context Management Deep Dive The SDK provides comprehensive context management capabilities for complex multi-agent workflows: ### a) Core Context Architecture **RunContextWrapper[TContext]** ```python @dataclass class RunContextWrapper(Generic[TContext]): context: TContext # Your custom context object usage: Usage # Tracks LLM usage across the run ``` - Generic type parameter `TContext` ensures type safety - Context is mutable and passed by reference - All components receive the same context instance ### b) Context Access Points **1. Tool Functions** ```python @function_tool async def my_tool(context: RunContextWrapper[MyContext], param: str) -> str: # Access custom context db = context.context.database_connection user = context.context.current_user # Modify context context.context.processed_items.append(param) return await db.query(param) ``` **2. Guardrails** ```python class MyGuardrail(InputGuardrail[MyContext]): async def check(self, agent: Agent, input: str | list, context: RunContextWrapper[MyContext]) -> GuardrailOutput: if context.context.user_role != "admin": return GuardrailOutput(should_block=True) return GuardrailOutput(should_block=False) ``` **3. Lifecycle Hooks** ```python class MyHooks(AgentHooks[MyContext]): async def on_start(self, context: RunContextWrapper[MyContext], agent: Agent): # Initialize agent-specific resources context.context.agent_resources[agent.name] = ResourcePool() ``` **4. Handoff Filters** ```python def context_transformer(handoff_data: HandoffInputData) -> HandoffInputData: # Transform context between agents # Can modify input history, items, etc. return handoff_data handoff = handoff( target_agent, input_filter=context_transformer ) ``` ### c) Context Persistence Patterns **1. External State Store Integration** ```python @dataclass class PersistentContext: redis_client: Redis postgres_conn: asyncpg.Connection state_cache: dict[str, Any] async def persist(self): # Save to external stores await self.redis_client.set("context_state", json.dumps(self.state_cache)) await self.postgres_conn.execute("INSERT INTO context_history ...") @classmethod async def restore(cls, session_id: str) -> PersistentContext: # Restore from external stores redis = await aioredis.create_redis_pool(...) postgres = await asyncpg.connect(...) state = await redis.get(f"context_{session_id}") return cls(redis, postgres, json.loads(state)) ``` **2. Context Checkpointing** ```python class CheckpointingHooks(RunHooks[PersistentContext]): async def on_agent_end(self, context: RunContextWrapper[PersistentContext], agent: Agent, result: Any): # Checkpoint after each agent completes await context.context.persist() ``` ### d) Dynamic Context Modification **1. Context Enrichment Pattern** ```python # Each agent enriches context with its results research_agent = Agent( name="Researcher", tools=[ @function_tool async def save_research(context: RunContextWrapper, data: str): context.context.research_data.append(data) return "Saved" ] ) analysis_agent = Agent( name="Analyst", tools=[ @function_tool async def analyze(context: RunContextWrapper): # Access enriched context from previous agent research = context.context.research_data analysis = perform_analysis(research) context.context.analysis_results = analysis return analysis ] ) ``` **2. Context Isolation Between Agents** ```python class IsolatedContext: def __init__(self): self.global_state = {} # Shared across all agents self.agent_state = {} # Isolated per agent def get_agent_context(self, agent_name: str): if agent_name not in self.agent_state: self.agent_state[agent_name] = {} return self.agent_state[agent_name] # In handoff filter def isolate_context(handoff_data: HandoffInputData) -> HandoffInputData: # Clear sensitive data before handoff context = handoff_data.context context.context.agent_state.clear() return handoff_data ``` ### e) External Resource Management **1. Connection Pooling** ```python @dataclass class ResourceContext: _db_pool: asyncpg.Pool _http_session: aiohttp.ClientSession _api_clients: dict[str, Any] async def get_db_connection(self) -> asyncpg.Connection: return await self._db_pool.acquire() def get_api_client(self, service: str) -> Any: if service not in self._api_clients: self._api_clients[service] = create_client(service) return self._api_clients[service] async def cleanup(self): await self._db_pool.close() await self._http_session.close() ``` **2. Resource Lifecycle Management** ```python class ResourceManager: @classmethod @asynccontextmanager async def create_context(cls) -> ResourceContext: # Initialize resources db_pool = await asyncpg.create_pool(...) http_session = aiohttp.ClientSession() context = ResourceContext(db_pool, http_session, {}) try: yield context finally: # Cleanup resources await context.cleanup() # Usage async with ResourceManager.create_context() as resource_ctx: result = await Runner.run( agent, input="Process data", context=resource_ctx ) ``` ### f) Concurrent Context Access **1. Thread-Safe Context Updates** ```python import asyncio from threading import Lock @dataclass class ConcurrentContext: _lock: asyncio.Lock = field(default_factory=asyncio.Lock) _data: dict = field(default_factory=dict) async def update(self, key: str, value: Any): async with self._lock: self._data[key] = value async def get(self, key: str) -> Any: async with self._lock: return self._data.get(key) ``` **2. Context Merging for Parallel Agents** ```python async def run_parallel_agents(base_context: MyContext): # Create context copies for parallel execution contexts = [ RunContextWrapper(context=copy.deepcopy(base_context)) for _ in range(3) ] # Run agents in parallel results = await asyncio.gather( Runner.run(agent1, "Task 1", context=contexts[0].context), Runner.run(agent2, "Task 2", context=contexts[1].context), Runner.run(agent3, "Task 3", context=contexts[2].context), ) # Merge contexts back for ctx in contexts: base_context.merge_results(ctx.context) return results ``` ### g) Context Transformation Between Agents **1. Schema Evolution** ```python class ContextAdapter: @staticmethod def v1_to_v2(old_context: ContextV1) -> ContextV2: return ContextV2( user_id=old_context.user_id, preferences=old_context.get_preferences(), # New fields with defaults feature_flags={}, api_version="v2" ) # In handoff handoff = handoff( new_agent, input_filter=lambda data: ContextAdapter.v1_to_v2(data.context) ) ``` **2. Domain-Specific Context Views** ```python class MultiDomainContext: def __init__(self): self.financial_data = FinancialContext() self.user_data = UserContext() self.system_data = SystemContext() def get_view(self, domain: str): """Return domain-specific view of context""" views = { "financial": self.financial_data, "user": self.user_data, "system": self.system_data } return views.get(domain) # Agents access only their domain financial_agent = Agent( name="FinancialAnalyst", tools=[ @function_tool async def analyze(context: RunContextWrapper[MultiDomainContext]): fin_ctx = context.context.get_view("financial") # Only sees financial data return fin_ctx.calculate_metrics() ] ) ``` ### h) Best Practices for Context Management 1. **Keep Context Serializable**: Use dataclasses or Pydantic models for easy persistence 2. **Minimize Context Size**: Store references/IDs rather than full objects when possible 3. **Use Type Hints**: Leverage `Generic[TContext]` for compile-time safety 4. **Handle Cleanup**: Implement proper resource cleanup in hooks or context managers 5. **Version Your Context**: Plan for schema evolution from the start 6. **Isolate Sensitive Data**: Use handoff filters to prevent data leakage between agents 7. **Monitor Context Growth**: Track context size to prevent memory issues 8. **Use Async Locks**: Protect shared state in concurrent scenarios ## 6. Deployment and Hosting Guide The SDK is designed to be deployed in various ways, from simple API servers to complex distributed systems. Here's how to share your agents with the world: ### a) Basic API Server Deployment **1. FastAPI Integration (Recommended)** ```python from fastapi import FastAPI, HTTPException from pydantic import BaseModel from agents import Agent, Runner # Define your agent agent = Agent( name="MyAssistant", instructions="You are a helpful assistant.", tools=[...], # Your tools ) app = FastAPI() class ChatRequest(BaseModel): message: str session_id: str | None = None class ChatResponse(BaseModel): response: str session_id: str @app.post("/chat") async def chat(request: ChatRequest) -> ChatResponse: try: result = await Runner.run(agent, request.message) return ChatResponse( response=result.final_output, session_id=request.session_id or "default" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Run with: uvicorn main:app --host 0.0.0.0 --port 8000 ``` **2. Streaming API Endpoint** ```python from fastapi import FastAPI from starlette.responses import StreamingResponse from agents import Agent, Runner, RawResponsesStreamEvent import json app = FastAPI() @app.post("/chat/stream") async def stream_chat(request: ChatRequest): result = Runner.run_streamed(agent, request.message) async def event_generator(): async for event in result.stream_events(): if isinstance(event, RawResponsesStreamEvent): yield f"data: {json.dumps({'type': event.type, 'data': str(event.data)})}\n\n" yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream" ) ``` **3. WebSocket Real-time Communication** ```python from fastapi import FastAPI, WebSocket, WebSocketDisconnect import json app = FastAPI() active_sessions = {} @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: str): await websocket.accept() try: while True: data = await websocket.receive_text() message = json.loads(data) # Run agent result = Runner.run_streamed(agent, message["text"]) # Stream responses async for event in result.stream_events(): await websocket.send_json({ "type": "stream", "data": str(event) }) # Send final result await websocket.send_json({ "type": "final", "data": result.final_output }) except WebSocketDisconnect: pass ``` ### b) Production Deployment Patterns **1. Containerized Deployment (Docker)** ```dockerfile # Dockerfile FROM python:3.11-slim WORKDIR /app # Install dependencies COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt # Copy application COPY . . # Run the application CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] ``` ```yaml # docker-compose.yml version: '3.8' services: agent-api: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} volumes: - ./data:/app/data # For SQLite sessions restart: unless-stopped ``` **2. Kubernetes Deployment** ```yaml # deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: agent-api spec: replicas: 3 selector: matchLabels: app: agent-api template: metadata: labels: app: agent-api spec: containers: - name: agent-api image: your-registry/agent-api:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: agent-secrets key: openai-api-key resources: requests: memory: "512Mi" cpu: "500m" limits: memory: "1Gi" cpu: "1000m" --- apiVersion: v1 kind: Service metadata: name: agent-api-service spec: selector: app: agent-api ports: - protocol: TCP port: 80 targetPort: 8000 type: LoadBalancer ``` ### c) Serverless Deployment **1. AWS Lambda with Mangum** ```python # lambda_handler.py from mangum import Mangum from fastapi import FastAPI from agents import Agent, Runner app = FastAPI() agent = Agent(...) @app.post("/chat") async def chat(message: str): result = await Runner.run(agent, message) return {"response": result.final_output} # Lambda handler handler = Mangum(app) ``` **2. Vercel/Netlify Functions** ```python # api/chat.py from http.server import BaseHTTPRequestHandler import json from agents import Agent, Runner import asyncio agent = Agent(...) class handler(BaseHTTPRequestHandler): def do_POST(self): content_length = int(self.headers['Content-Length']) post_data = self.rfile.read(content_length) data = json.loads(post_data) # Run agent loop = asyncio.new_event_loop() result = loop.run_until_complete( Runner.run(agent, data['message']) ) self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(json.dumps({ 'response': result.final_output }).encode()) ``` ### d) Multi-Agent API Architecture **1. Agent Registry Pattern** ```python from fastapi import FastAPI from typing import Dict from agents import Agent, Runner class AgentRegistry: def __init__(self): self._agents: Dict[str, Agent] = {} def register(self, name: str, agent: Agent): self._agents[name] = agent def get(self, name: str) -> Agent | None: return self._agents.get(name) def list_agents(self) -> list[str]: return list(self._agents.keys()) # Initialize registry registry = AgentRegistry() # Register your agents registry.register("customer_support", customer_support_agent) registry.register("sales", sales_agent) registry.register("technical", technical_agent) app = FastAPI() @app.get("/agents") async def list_agents(): return {"agents": registry.list_agents()} @app.post("/chat/{agent_name}") async def chat_with_agent(agent_name: str, message: str): agent = registry.get(agent_name) if not agent: raise HTTPException(404, f"Agent {agent_name} not found") result = await Runner.run(agent, message) return {"response": result.final_output} ``` **2. Load Balancing Multiple Agent Instances** ```python import asyncio from typing import List from agents import Agent, Runner class AgentPool: def __init__(self, agent: Agent, pool_size: int = 5): self.agent = agent self.semaphore = asyncio.Semaphore(pool_size) async def run(self, message: str): async with self.semaphore: return await Runner.run(self.agent, message) # Create agent pool agent_pool = AgentPool(agent, pool_size=10) @app.post("/chat") async def chat(message: str): # Automatically manages concurrent requests result = await agent_pool.run(message) return {"response": result.final_output} ``` ### e) Authentication and Rate Limiting **1. API Key Authentication** ```python from fastapi import FastAPI, Header, HTTPException from fastapi.security import APIKeyHeader import redis app = FastAPI() api_key_header = APIKeyHeader(name="X-API-Key") redis_client = redis.Redis() async def verify_api_key(api_key: str = Header(...)): # Check if API key exists in database/cache if not redis_client.exists(f"api_key:{api_key}"): raise HTTPException(401, "Invalid API key") return api_key @app.post("/chat") async def chat(message: str, api_key: str = Depends(verify_api_key)): # Track usage redis_client.hincrby(f"usage:{api_key}", "requests", 1) result = await Runner.run(agent, message) return {"response": result.final_output} ``` **2. Rate Limiting** ```python from fastapi import FastAPI, Request from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(429, _rate_limit_exceeded_handler) @app.post("/chat") @limiter.limit("10/minute") async def chat(request: Request, message: str): result = await Runner.run(agent, message) return {"response": result.final_output} ``` ### f) Production Best Practices **1. Health Checks and Monitoring** ```python from fastapi import FastAPI from prometheus_client import Counter, Histogram, generate_latest import time # Metrics request_count = Counter('agent_requests_total', 'Total requests') request_duration = Histogram('agent_request_duration_seconds', 'Request duration') @app.get("/health") async def health_check(): # Check agent availability try: result = await Runner.run(agent, "test") return {"status": "healthy", "agent": "responsive"} except Exception as e: return {"status": "unhealthy", "error": str(e)} @app.get("/metrics") async def metrics(): return Response(generate_latest(), media_type="text/plain") @app.middleware("http") async def track_metrics(request: Request, call_next): if request.url.path == "/chat": request_count.inc() start = time.time() response = await call_next(request) if request.url.path == "/chat": request_duration.observe(time.time() - start) return response ``` **2. Graceful Shutdown** ```python import signal import asyncio shutdown_event = asyncio.Event() def signal_handler(sig, frame): shutdown_event.set() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) @app.on_event("shutdown") async def shutdown(): # Clean up resources await session_cleanup() await close_db_connections() ``` **3. Session Persistence for Distributed Deployment** ```python from agents.memory import Session import redis import json class RedisSession(Session): def __init__(self, session_id: str, redis_client: redis.Redis): self.session_id = session_id self.redis = redis_client self.key = f"session:{session_id}" async def get_items(self, limit: int | None = None) -> List[dict]: items = self.redis.lrange(self.key, 0, -1) items = [json.loads(item) for item in items] if limit: return items[-limit:] return items async def add_items(self, items: List[dict]) -> None: for item in items: self.redis.rpush(self.key, json.dumps(item)) # Set expiration (24 hours) self.redis.expire(self.key, 86400) # Use in API @app.post("/chat") async def chat(message: str, session_id: str): session = RedisSession(session_id, redis_client) result = await Runner.run(agent, message, session=session) return {"response": result.final_output} ``` ### g) Scaling Strategies 1. **Horizontal Scaling**: Deploy multiple API instances behind a load balancer 2. **Caching**: Cache common queries using Redis/Memcached 3. **Queue-based Processing**: Use Celery/RQ for async processing of long-running tasks 4. **Database Optimization**: Use connection pooling and read replicas 5. **CDN Integration**: Serve static agent responses from edge locations 6. **Auto-scaling**: Configure based on CPU/memory usage or request rate ### h) Deployment Checklist - [ ] Set up environment variables (API keys, configuration) - [ ] Configure logging and monitoring - [ ] Implement health checks - [ ] Set up rate limiting and authentication - [ ] Configure CORS if needed - [ ] Set up SSL/TLS certificates - [ ] Configure database/session persistence - [ ] Set up backup and disaster recovery - [ ] Implement graceful shutdown - [ ] Configure auto-scaling policies - [ ] Set up CI/CD pipeline - [ ] Document API endpoints - [ ] Load test the deployment - [ ] Set up error tracking (Sentry, etc.) - [ ] Configure observability (traces, metrics, logs) The SDK's stateless design and async-first architecture make it ideal for modern cloud deployments, whether you're serving a few users or scaling to millions. ## 7. Production Considerations ### a) Scalability - Stateless agent design enables horizontal scaling - Async-first implementation for high concurrency - Session backends can be distributed (Redis, etc.) ### b) Reliability - Comprehensive error handling with custom exceptions - Guardrails for input/output validation - Configurable timeouts and retry mechanisms ### c) Monitoring - Rich tracing data with customizable metadata - Usage tracking for cost management - Event streaming for real-time monitoring ### d) Security - Guardrail system for content filtering - Configurable sensitive data exclusion from traces - Tool permission management through context ## 8. Advanced Patterns ### a) Hierarchical Agent Systems - Manager agents coordinating specialist agents - Dynamic agent selection based on task requirements - Result aggregation and synthesis ### b) Workflow Orchestration - Complex multi-step workflows with conditional logic - State machines implemented through handoffs - Dynamic workflow adaptation based on results ### c) Resource Management - Shared model clients across agents - Connection pooling for external services - Efficient message passing without duplication ## Design Philosophy The SDK's design philosophy emphasizes: - **Composability**: Small, focused agents that combine into complex systems - **Flexibility**: Multiple patterns for different use cases - **Observability**: Deep insights into agent behavior - **Production-readiness**: Built for scale, monitoring, and reliability This architecture enables building sophisticated multi-agent systems that can handle complex, real-world tasks while maintaining clarity and maintainability.