Skip to content

Instantly share code, notes, and snippets.

@tkersey
Created July 27, 2025 18:25
Show Gist options
  • Save tkersey/05f89b229a42610b1d378ee43b95de30 to your computer and use it in GitHub Desktop.
Save tkersey/05f89b229a42610b1d378ee43b95de30 to your computer and use it in GitHub Desktop.

Revisions

  1. tkersey created this gist Jul 27, 2025.
    900 changes: 900 additions & 0 deletions openai-agents.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,900 @@
    # Deep Analysis: OpenAI Agents Python SDK

    Based on my comprehensive exploration of the codebase, here's how this SDK handles multi-agent systems:

    ## 1. Agent Composition Patterns

    The SDK supports several powerful composition patterns:

    ### a) Handoffs (Agent-to-Agent Delegation)
    - Agents can transfer control to other agents through the `handoffs` mechanism
    - Handoffs appear as tools to the LLM, allowing dynamic routing based on context
    - Example: A triage agent routing to language-specific agents (src/agents/handoffs.py:58-268)
    - Handoffs can include input filters to transform conversation history before passing control

    ### b) Agents as Tools
    - Any agent can be exposed as a tool to other agents via `agent.as_tool()`
    - This enables hierarchical agent structures where specialized agents are invoked for specific tasks
    - Example: Financial research agent using fundamentals and risk analysis agents as tools (examples/financial_research_agent/manager.py:102-112)

    ### c) Parallel Execution
    - Multiple agents can run concurrently using `asyncio.gather()`
    - Results can be aggregated by a coordinator agent
    - Example: Running multiple translation agents in parallel and selecting the best result (examples/agent_patterns/parallelization.py:26-39)

    ## 2. Communication Mechanisms

    ### a) Message Passing
    - Agents communicate through structured message items (`RunItem`)
    - Messages preserve full conversation history including tool calls and responses
    - Each agent receives the complete context when invoked

    ### b) Context Objects
    - Shared mutable context via `RunContextWrapper[TContext]` (src/agents/run_context.py:11-27)
    - Context is passed to all tools, guardrails, and hooks
    - Enables dependency injection and state sharing across agent components

    ### c) Streaming Events
    - Real-time communication through event streams
    - Events include agent starts/ends, handoffs, tool calls, and custom events
    - Enables reactive UI updates and monitoring

    ## 3. Execution Patterns

    ### a) Sequential Execution (Default)
    - Agent loop runs until final output is produced (src/agents/run.py:372-498)
    - Turn-based execution with configurable max_turns limit
    - Each turn: LLM call → Tool execution → Handoff/Continue/Finish

    ### b) Concurrent Tool Execution
    - Tools within a single agent run concurrently by default
    - Implemented using `asyncio.gather()` for parallel tool calls
    - Results are collected and passed back to the LLM

    ### c) Streaming Execution
    - Non-blocking execution with event streaming (src/agents/run.py:526-594)
    - Allows real-time UI updates while agents process
    - Maintains same execution semantics as non-streaming mode

    ## 4. State Management

    ### a) Session Memory
    - Built-in conversation persistence via `Session` interface
    - SQLite implementation provided, custom backends supported
    - Automatic history management across agent runs

    ### b) Agent State
    - Agents are stateless by design - state lives in context and messages
    - Each agent run creates new execution context
    - State persistence handled through sessions and context objects

    ### c) Tool Use Tracking
    - Tracks which tools agents have used to prevent infinite loops
    - Configurable tool choice reset behavior after tool calls

    ## 5. Context Management Deep Dive

    The SDK provides comprehensive context management capabilities for complex multi-agent workflows:

    ### a) Core Context Architecture

    **RunContextWrapper[TContext]**
    ```python
    @dataclass
    class RunContextWrapper(Generic[TContext]):
    context: TContext # Your custom context object
    usage: Usage # Tracks LLM usage across the run
    ```

    - Generic type parameter `TContext` ensures type safety
    - Context is mutable and passed by reference
    - All components receive the same context instance

    ### b) Context Access Points

    **1. Tool Functions**
    ```python
    @function_tool
    async def my_tool(context: RunContextWrapper[MyContext], param: str) -> str:
    # Access custom context
    db = context.context.database_connection
    user = context.context.current_user

    # Modify context
    context.context.processed_items.append(param)

    return await db.query(param)
    ```

    **2. Guardrails**
    ```python
    class MyGuardrail(InputGuardrail[MyContext]):
    async def check(self, agent: Agent, input: str | list,
    context: RunContextWrapper[MyContext]) -> GuardrailOutput:
    if context.context.user_role != "admin":
    return GuardrailOutput(should_block=True)
    return GuardrailOutput(should_block=False)
    ```

    **3. Lifecycle Hooks**
    ```python
    class MyHooks(AgentHooks[MyContext]):
    async def on_start(self, context: RunContextWrapper[MyContext], agent: Agent):
    # Initialize agent-specific resources
    context.context.agent_resources[agent.name] = ResourcePool()
    ```

    **4. Handoff Filters**
    ```python
    def context_transformer(handoff_data: HandoffInputData) -> HandoffInputData:
    # Transform context between agents
    # Can modify input history, items, etc.
    return handoff_data

    handoff = handoff(
    target_agent,
    input_filter=context_transformer
    )
    ```

    ### c) Context Persistence Patterns

    **1. External State Store Integration**
    ```python
    @dataclass
    class PersistentContext:
    redis_client: Redis
    postgres_conn: asyncpg.Connection
    state_cache: dict[str, Any]

    async def persist(self):
    # Save to external stores
    await self.redis_client.set("context_state", json.dumps(self.state_cache))
    await self.postgres_conn.execute("INSERT INTO context_history ...")

    @classmethod
    async def restore(cls, session_id: str) -> PersistentContext:
    # Restore from external stores
    redis = await aioredis.create_redis_pool(...)
    postgres = await asyncpg.connect(...)
    state = await redis.get(f"context_{session_id}")
    return cls(redis, postgres, json.loads(state))
    ```

    **2. Context Checkpointing**
    ```python
    class CheckpointingHooks(RunHooks[PersistentContext]):
    async def on_agent_end(self, context: RunContextWrapper[PersistentContext],
    agent: Agent, result: Any):
    # Checkpoint after each agent completes
    await context.context.persist()
    ```

    ### d) Dynamic Context Modification

    **1. Context Enrichment Pattern**
    ```python
    # Each agent enriches context with its results
    research_agent = Agent(
    name="Researcher",
    tools=[
    @function_tool
    async def save_research(context: RunContextWrapper, data: str):
    context.context.research_data.append(data)
    return "Saved"
    ]
    )

    analysis_agent = Agent(
    name="Analyst",
    tools=[
    @function_tool
    async def analyze(context: RunContextWrapper):
    # Access enriched context from previous agent
    research = context.context.research_data
    analysis = perform_analysis(research)
    context.context.analysis_results = analysis
    return analysis
    ]
    )
    ```

    **2. Context Isolation Between Agents**
    ```python
    class IsolatedContext:
    def __init__(self):
    self.global_state = {} # Shared across all agents
    self.agent_state = {} # Isolated per agent

    def get_agent_context(self, agent_name: str):
    if agent_name not in self.agent_state:
    self.agent_state[agent_name] = {}
    return self.agent_state[agent_name]

    # In handoff filter
    def isolate_context(handoff_data: HandoffInputData) -> HandoffInputData:
    # Clear sensitive data before handoff
    context = handoff_data.context
    context.context.agent_state.clear()
    return handoff_data
    ```

    ### e) External Resource Management

    **1. Connection Pooling**
    ```python
    @dataclass
    class ResourceContext:
    _db_pool: asyncpg.Pool
    _http_session: aiohttp.ClientSession
    _api_clients: dict[str, Any]

    async def get_db_connection(self) -> asyncpg.Connection:
    return await self._db_pool.acquire()

    def get_api_client(self, service: str) -> Any:
    if service not in self._api_clients:
    self._api_clients[service] = create_client(service)
    return self._api_clients[service]

    async def cleanup(self):
    await self._db_pool.close()
    await self._http_session.close()
    ```

    **2. Resource Lifecycle Management**
    ```python
    class ResourceManager:
    @classmethod
    @asynccontextmanager
    async def create_context(cls) -> ResourceContext:
    # Initialize resources
    db_pool = await asyncpg.create_pool(...)
    http_session = aiohttp.ClientSession()

    context = ResourceContext(db_pool, http_session, {})
    try:
    yield context
    finally:
    # Cleanup resources
    await context.cleanup()

    # Usage
    async with ResourceManager.create_context() as resource_ctx:
    result = await Runner.run(
    agent,
    input="Process data",
    context=resource_ctx
    )
    ```

    ### f) Concurrent Context Access

    **1. Thread-Safe Context Updates**
    ```python
    import asyncio
    from threading import Lock

    @dataclass
    class ConcurrentContext:
    _lock: asyncio.Lock = field(default_factory=asyncio.Lock)
    _data: dict = field(default_factory=dict)

    async def update(self, key: str, value: Any):
    async with self._lock:
    self._data[key] = value

    async def get(self, key: str) -> Any:
    async with self._lock:
    return self._data.get(key)
    ```

    **2. Context Merging for Parallel Agents**
    ```python
    async def run_parallel_agents(base_context: MyContext):
    # Create context copies for parallel execution
    contexts = [
    RunContextWrapper(context=copy.deepcopy(base_context))
    for _ in range(3)
    ]

    # Run agents in parallel
    results = await asyncio.gather(
    Runner.run(agent1, "Task 1", context=contexts[0].context),
    Runner.run(agent2, "Task 2", context=contexts[1].context),
    Runner.run(agent3, "Task 3", context=contexts[2].context),
    )

    # Merge contexts back
    for ctx in contexts:
    base_context.merge_results(ctx.context)

    return results
    ```

    ### g) Context Transformation Between Agents

    **1. Schema Evolution**
    ```python
    class ContextAdapter:
    @staticmethod
    def v1_to_v2(old_context: ContextV1) -> ContextV2:
    return ContextV2(
    user_id=old_context.user_id,
    preferences=old_context.get_preferences(),
    # New fields with defaults
    feature_flags={},
    api_version="v2"
    )

    # In handoff
    handoff = handoff(
    new_agent,
    input_filter=lambda data: ContextAdapter.v1_to_v2(data.context)
    )
    ```

    **2. Domain-Specific Context Views**
    ```python
    class MultiDomainContext:
    def __init__(self):
    self.financial_data = FinancialContext()
    self.user_data = UserContext()
    self.system_data = SystemContext()

    def get_view(self, domain: str):
    """Return domain-specific view of context"""
    views = {
    "financial": self.financial_data,
    "user": self.user_data,
    "system": self.system_data
    }
    return views.get(domain)

    # Agents access only their domain
    financial_agent = Agent(
    name="FinancialAnalyst",
    tools=[
    @function_tool
    async def analyze(context: RunContextWrapper[MultiDomainContext]):
    fin_ctx = context.context.get_view("financial")
    # Only sees financial data
    return fin_ctx.calculate_metrics()
    ]
    )
    ```

    ### h) Best Practices for Context Management

    1. **Keep Context Serializable**: Use dataclasses or Pydantic models for easy persistence
    2. **Minimize Context Size**: Store references/IDs rather than full objects when possible
    3. **Use Type Hints**: Leverage `Generic[TContext]` for compile-time safety
    4. **Handle Cleanup**: Implement proper resource cleanup in hooks or context managers
    5. **Version Your Context**: Plan for schema evolution from the start
    6. **Isolate Sensitive Data**: Use handoff filters to prevent data leakage between agents
    7. **Monitor Context Growth**: Track context size to prevent memory issues
    8. **Use Async Locks**: Protect shared state in concurrent scenarios

    ## 6. Deployment and Hosting Guide

    The SDK is designed to be deployed in various ways, from simple API servers to complex distributed systems. Here's how to share your agents with the world:

    ### a) Basic API Server Deployment

    **1. FastAPI Integration (Recommended)**
    ```python
    from fastapi import FastAPI, HTTPException
    from pydantic import BaseModel
    from agents import Agent, Runner

    # Define your agent
    agent = Agent(
    name="MyAssistant",
    instructions="You are a helpful assistant.",
    tools=[...], # Your tools
    )

    app = FastAPI()

    class ChatRequest(BaseModel):
    message: str
    session_id: str | None = None

    class ChatResponse(BaseModel):
    response: str
    session_id: str

    @app.post("/chat")
    async def chat(request: ChatRequest) -> ChatResponse:
    try:
    result = await Runner.run(agent, request.message)
    return ChatResponse(
    response=result.final_output,
    session_id=request.session_id or "default"
    )
    except Exception as e:
    raise HTTPException(status_code=500, detail=str(e))

    # Run with: uvicorn main:app --host 0.0.0.0 --port 8000
    ```

    **2. Streaming API Endpoint**
    ```python
    from fastapi import FastAPI
    from starlette.responses import StreamingResponse
    from agents import Agent, Runner, RawResponsesStreamEvent
    import json

    app = FastAPI()

    @app.post("/chat/stream")
    async def stream_chat(request: ChatRequest):
    result = Runner.run_streamed(agent, request.message)

    async def event_generator():
    async for event in result.stream_events():
    if isinstance(event, RawResponsesStreamEvent):
    yield f"data: {json.dumps({'type': event.type, 'data': str(event.data)})}\n\n"
    yield f"data: {json.dumps({'type': 'done', 'final': result.final_output})}\n\n"

    return StreamingResponse(
    event_generator(),
    media_type="text/event-stream"
    )
    ```

    **3. WebSocket Real-time Communication**
    ```python
    from fastapi import FastAPI, WebSocket, WebSocketDisconnect
    import json

    app = FastAPI()
    active_sessions = {}

    @app.websocket("/ws/{client_id}")
    async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    try:
    while True:
    data = await websocket.receive_text()
    message = json.loads(data)

    # Run agent
    result = Runner.run_streamed(agent, message["text"])

    # Stream responses
    async for event in result.stream_events():
    await websocket.send_json({
    "type": "stream",
    "data": str(event)
    })

    # Send final result
    await websocket.send_json({
    "type": "final",
    "data": result.final_output
    })
    except WebSocketDisconnect:
    pass
    ```

    ### b) Production Deployment Patterns

    **1. Containerized Deployment (Docker)**
    ```dockerfile
    # Dockerfile
    FROM python:3.11-slim

    WORKDIR /app

    # Install dependencies
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt

    # Copy application
    COPY . .

    # Run the application
    CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
    ```

    ```yaml
    # docker-compose.yml
    version: '3.8'
    services:
    agent-api:
    build: .
    ports:
    - "8000:8000"
    environment:
    - OPENAI_API_KEY=${OPENAI_API_KEY}
    volumes:
    - ./data:/app/data # For SQLite sessions
    restart: unless-stopped
    ```
    **2. Kubernetes Deployment**
    ```yaml
    # deployment.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: agent-api
    spec:
    replicas: 3
    selector:
    matchLabels:
    app: agent-api
    template:
    metadata:
    labels:
    app: agent-api
    spec:
    containers:
    - name: agent-api
    image: your-registry/agent-api:latest
    ports:
    - containerPort: 8000
    env:
    - name: OPENAI_API_KEY
    valueFrom:
    secretKeyRef:
    name: agent-secrets
    key: openai-api-key
    resources:
    requests:
    memory: "512Mi"
    cpu: "500m"
    limits:
    memory: "1Gi"
    cpu: "1000m"
    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: agent-api-service
    spec:
    selector:
    app: agent-api
    ports:
    - protocol: TCP
    port: 80
    targetPort: 8000
    type: LoadBalancer
    ```
    ### c) Serverless Deployment
    **1. AWS Lambda with Mangum**
    ```python
    # lambda_handler.py
    from mangum import Mangum
    from fastapi import FastAPI
    from agents import Agent, Runner

    app = FastAPI()
    agent = Agent(...)

    @app.post("/chat")
    async def chat(message: str):
    result = await Runner.run(agent, message)
    return {"response": result.final_output}

    # Lambda handler
    handler = Mangum(app)
    ```

    **2. Vercel/Netlify Functions**
    ```python
    # api/chat.py
    from http.server import BaseHTTPRequestHandler
    import json
    from agents import Agent, Runner
    import asyncio

    agent = Agent(...)

    class handler(BaseHTTPRequestHandler):
    def do_POST(self):
    content_length = int(self.headers['Content-Length'])
    post_data = self.rfile.read(content_length)
    data = json.loads(post_data)

    # Run agent
    loop = asyncio.new_event_loop()
    result = loop.run_until_complete(
    Runner.run(agent, data['message'])
    )

    self.send_response(200)
    self.send_header('Content-type', 'application/json')
    self.end_headers()
    self.wfile.write(json.dumps({
    'response': result.final_output
    }).encode())
    ```

    ### d) Multi-Agent API Architecture

    **1. Agent Registry Pattern**
    ```python
    from fastapi import FastAPI
    from typing import Dict
    from agents import Agent, Runner

    class AgentRegistry:
    def __init__(self):
    self._agents: Dict[str, Agent] = {}

    def register(self, name: str, agent: Agent):
    self._agents[name] = agent

    def get(self, name: str) -> Agent | None:
    return self._agents.get(name)

    def list_agents(self) -> list[str]:
    return list(self._agents.keys())

    # Initialize registry
    registry = AgentRegistry()

    # Register your agents
    registry.register("customer_support", customer_support_agent)
    registry.register("sales", sales_agent)
    registry.register("technical", technical_agent)

    app = FastAPI()

    @app.get("/agents")
    async def list_agents():
    return {"agents": registry.list_agents()}

    @app.post("/chat/{agent_name}")
    async def chat_with_agent(agent_name: str, message: str):
    agent = registry.get(agent_name)
    if not agent:
    raise HTTPException(404, f"Agent {agent_name} not found")

    result = await Runner.run(agent, message)
    return {"response": result.final_output}
    ```

    **2. Load Balancing Multiple Agent Instances**
    ```python
    import asyncio
    from typing import List
    from agents import Agent, Runner

    class AgentPool:
    def __init__(self, agent: Agent, pool_size: int = 5):
    self.agent = agent
    self.semaphore = asyncio.Semaphore(pool_size)

    async def run(self, message: str):
    async with self.semaphore:
    return await Runner.run(self.agent, message)

    # Create agent pool
    agent_pool = AgentPool(agent, pool_size=10)

    @app.post("/chat")
    async def chat(message: str):
    # Automatically manages concurrent requests
    result = await agent_pool.run(message)
    return {"response": result.final_output}
    ```

    ### e) Authentication and Rate Limiting

    **1. API Key Authentication**
    ```python
    from fastapi import FastAPI, Header, HTTPException
    from fastapi.security import APIKeyHeader
    import redis

    app = FastAPI()
    api_key_header = APIKeyHeader(name="X-API-Key")
    redis_client = redis.Redis()

    async def verify_api_key(api_key: str = Header(...)):
    # Check if API key exists in database/cache
    if not redis_client.exists(f"api_key:{api_key}"):
    raise HTTPException(401, "Invalid API key")
    return api_key

    @app.post("/chat")
    async def chat(message: str, api_key: str = Depends(verify_api_key)):
    # Track usage
    redis_client.hincrby(f"usage:{api_key}", "requests", 1)

    result = await Runner.run(agent, message)
    return {"response": result.final_output}
    ```

    **2. Rate Limiting**
    ```python
    from fastapi import FastAPI, Request
    from slowapi import Limiter, _rate_limit_exceeded_handler
    from slowapi.util import get_remote_address

    limiter = Limiter(key_func=get_remote_address)
    app = FastAPI()
    app.state.limiter = limiter
    app.add_exception_handler(429, _rate_limit_exceeded_handler)

    @app.post("/chat")
    @limiter.limit("10/minute")
    async def chat(request: Request, message: str):
    result = await Runner.run(agent, message)
    return {"response": result.final_output}
    ```

    ### f) Production Best Practices

    **1. Health Checks and Monitoring**
    ```python
    from fastapi import FastAPI
    from prometheus_client import Counter, Histogram, generate_latest
    import time

    # Metrics
    request_count = Counter('agent_requests_total', 'Total requests')
    request_duration = Histogram('agent_request_duration_seconds', 'Request duration')

    @app.get("/health")
    async def health_check():
    # Check agent availability
    try:
    result = await Runner.run(agent, "test")
    return {"status": "healthy", "agent": "responsive"}
    except Exception as e:
    return {"status": "unhealthy", "error": str(e)}

    @app.get("/metrics")
    async def metrics():
    return Response(generate_latest(), media_type="text/plain")

    @app.middleware("http")
    async def track_metrics(request: Request, call_next):
    if request.url.path == "/chat":
    request_count.inc()
    start = time.time()

    response = await call_next(request)

    if request.url.path == "/chat":
    request_duration.observe(time.time() - start)

    return response
    ```

    **2. Graceful Shutdown**
    ```python
    import signal
    import asyncio

    shutdown_event = asyncio.Event()

    def signal_handler(sig, frame):
    shutdown_event.set()

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    @app.on_event("shutdown")
    async def shutdown():
    # Clean up resources
    await session_cleanup()
    await close_db_connections()
    ```

    **3. Session Persistence for Distributed Deployment**
    ```python
    from agents.memory import Session
    import redis
    import json

    class RedisSession(Session):
    def __init__(self, session_id: str, redis_client: redis.Redis):
    self.session_id = session_id
    self.redis = redis_client
    self.key = f"session:{session_id}"

    async def get_items(self, limit: int | None = None) -> List[dict]:
    items = self.redis.lrange(self.key, 0, -1)
    items = [json.loads(item) for item in items]
    if limit:
    return items[-limit:]
    return items

    async def add_items(self, items: List[dict]) -> None:
    for item in items:
    self.redis.rpush(self.key, json.dumps(item))
    # Set expiration (24 hours)
    self.redis.expire(self.key, 86400)

    # Use in API
    @app.post("/chat")
    async def chat(message: str, session_id: str):
    session = RedisSession(session_id, redis_client)
    result = await Runner.run(agent, message, session=session)
    return {"response": result.final_output}
    ```

    ### g) Scaling Strategies

    1. **Horizontal Scaling**: Deploy multiple API instances behind a load balancer
    2. **Caching**: Cache common queries using Redis/Memcached
    3. **Queue-based Processing**: Use Celery/RQ for async processing of long-running tasks
    4. **Database Optimization**: Use connection pooling and read replicas
    5. **CDN Integration**: Serve static agent responses from edge locations
    6. **Auto-scaling**: Configure based on CPU/memory usage or request rate

    ### h) Deployment Checklist

    - [ ] Set up environment variables (API keys, configuration)
    - [ ] Configure logging and monitoring
    - [ ] Implement health checks
    - [ ] Set up rate limiting and authentication
    - [ ] Configure CORS if needed
    - [ ] Set up SSL/TLS certificates
    - [ ] Configure database/session persistence
    - [ ] Set up backup and disaster recovery
    - [ ] Implement graceful shutdown
    - [ ] Configure auto-scaling policies
    - [ ] Set up CI/CD pipeline
    - [ ] Document API endpoints
    - [ ] Load test the deployment
    - [ ] Set up error tracking (Sentry, etc.)
    - [ ] Configure observability (traces, metrics, logs)

    The SDK's stateless design and async-first architecture make it ideal for modern cloud deployments, whether you're serving a few users or scaling to millions.

    ## 7. Production Considerations

    ### a) Scalability
    - Stateless agent design enables horizontal scaling
    - Async-first implementation for high concurrency
    - Session backends can be distributed (Redis, etc.)

    ### b) Reliability
    - Comprehensive error handling with custom exceptions
    - Guardrails for input/output validation
    - Configurable timeouts and retry mechanisms

    ### c) Monitoring
    - Rich tracing data with customizable metadata
    - Usage tracking for cost management
    - Event streaming for real-time monitoring

    ### d) Security
    - Guardrail system for content filtering
    - Configurable sensitive data exclusion from traces
    - Tool permission management through context

    ## 8. Advanced Patterns

    ### a) Hierarchical Agent Systems
    - Manager agents coordinating specialist agents
    - Dynamic agent selection based on task requirements
    - Result aggregation and synthesis

    ### b) Workflow Orchestration
    - Complex multi-step workflows with conditional logic
    - State machines implemented through handoffs
    - Dynamic workflow adaptation based on results

    ### c) Resource Management
    - Shared model clients across agents
    - Connection pooling for external services
    - Efficient message passing without duplication

    ## Design Philosophy

    The SDK's design philosophy emphasizes:
    - **Composability**: Small, focused agents that combine into complex systems
    - **Flexibility**: Multiple patterns for different use cases
    - **Observability**: Deep insights into agent behavior
    - **Production-readiness**: Built for scale, monitoring, and reliability

    This architecture enables building sophisticated multi-agent systems that can handle complex, real-world tasks while maintaining clarity and maintainability.