Last active
October 27, 2025 15:33
-
-
Save MisreadableMind/a529af310d57622742943f67d10c8db2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Schema-Guided Reasoning (SGR) Demo with OpenAI | |
| This Python code demonstrates Schema-Guided Reasoning (SGR) with OpenAI. It: | |
| - Implements a business agent capable of planning and reasoning | |
| - Implements tool calling using only SGR and simple dispatch | |
| - Uses a simple (inexpensive) non-reasoning model for that | |
| To give this agent something to work with, we ask it to help with running | |
| a small business - selling courses to help achieve AGI faster. | |
| Once this script starts, it will emulate in-memory CRM with invoices, | |
| emails, products and rules. Then it will execute sequentially a set of | |
| tasks (see TASKS below). In order to carry them out, Agent will have to use | |
| tools to issue invoices, create rules, send emails, and a few others. | |
| Read more about SGR by Rinat Abdullin: http://abdullin.com/schema-guided-reasoning/ | |
| This demo is described in more detail here: https://abdullin.com/schema-guided-reasoning/demo | |
| """ | |
| from typing import List, Union, Literal, Annotated | |
| from annotated_types import MaxLen, Le, MinLen | |
| from pydantic import BaseModel, Field | |
| import json | |
| import time | |
| import os | |
| from datetime import datetime | |
| from openai import OpenAI | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.rule import Rule | |
| from rich.table import Table | |
| from rich.text import Text | |
| from rich.align import Align | |
| from rich.tree import Tree | |
| DB = { | |
| "rules": [], | |
| "invoices": {}, | |
| "emails": [], | |
| "products": { | |
| "SKU-205": {"name": "AGI 101 Course Personal", "price": 258}, | |
| "SKU-210": {"name": "AGI 101 Course Team (5 seats)", "price": 1290}, | |
| "SKU-220": {"name": "Building AGI - online exercises", "price": 315}, | |
| }, | |
| } | |
| class SendEmail(BaseModel): | |
| tool: Literal["send_email"] | |
| subject: str | |
| message: str | |
| files: List[str] | |
| recipient_email: str | |
| class GetCustomerData(BaseModel): | |
| tool: Literal["get_customer_data"] | |
| email: str | |
| class IssueInvoice(BaseModel): | |
| tool: Literal["issue_invoice"] | |
| email: str | |
| skus: List[str] | |
| discount_percent: Annotated[int, Le(50)] | |
| class VoidInvoice(BaseModel): | |
| tool: Literal["void_invoice"] | |
| invoice_id: str | |
| reason: str | |
| class CreateRule(BaseModel): | |
| tool: Literal["remember"] | |
| email: str | |
| rule: str | |
| class ReportTaskCompletion(BaseModel): | |
| tool: Literal["report_completion"] | |
| completed_steps_laconic: List[str] | |
| code: Literal["completed", "failed"] | |
| class NextStep(BaseModel): | |
| current_state: str | |
| plan_remaining_steps_brief: Annotated[List[str], MinLen(1), MaxLen(5)] | |
| task_completed: bool | |
| function: Union[ | |
| ReportTaskCompletion, | |
| SendEmail, | |
| GetCustomerData, | |
| IssueInvoice, | |
| VoidInvoice, | |
| CreateRule, | |
| ] = Field(..., description="execute first remaining step") | |
| # Global report logger | |
| class MarkdownReporter: | |
| def __init__(self): | |
| self.reset() | |
| def reset(self): | |
| self.content = [] | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| self.content.append(f"# Agent Execution Report\n") | |
| self.content.append(f"**Generated:** {timestamp}\n\n") | |
| self.content.append("## Available Products\n") | |
| for sku, product in DB["products"].items(): | |
| self.content.append(f"- **{sku}**: {product['name']} - ${product['price']:,.2f}") | |
| self.content.append("\n---\n\n") | |
| def add_task_header(self, task_idx, task): | |
| self.content.append(f"## Task {task_idx}: {task}\n") | |
| def add_step(self, step_num, action, function_data, result): | |
| self.content.append(f"### Step {step_num}\n") | |
| self.content.append(f"**Action:** {action}\n\n") | |
| self.content.append(f"**Function:** `{function_data.get('tool', 'unknown')}`\n\n") | |
| if isinstance(result, dict): | |
| self.content.append("**Result:**\n```json\n") | |
| self.content.append(json.dumps(result, indent=2)) | |
| self.content.append("\n```\n\n") | |
| else: | |
| self.content.append(f"**Result:** {result}\n\n") | |
| def add_task_completion(self, status, steps): | |
| icon = "β " if status == "completed" else "β" | |
| self.content.append(f"### Task Result: {icon} {status.upper()}\n") | |
| self.content.append("**Completed Steps:**\n") | |
| for step in steps: | |
| self.content.append(f"- {step}") | |
| self.content.append("\n---\n\n") | |
| def save_report(self, filename=None): | |
| if filename is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"agent_report_{timestamp}.md" | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| f.write('\n'.join(self.content)) | |
| return filename | |
| def format_currency(amount): | |
| return f"${amount:,.2f}" | |
| def format_tool_result(result, tool_name): | |
| console = Console() | |
| if isinstance(result, str): | |
| return Panel(result, title=f"π§ {tool_name.replace('_', ' ').title()}", | |
| title_align="left", border_style="yellow") | |
| if tool_name == "send_email": | |
| table = Table(show_header=False, box=None, padding=(0, 1)) | |
| table.add_row("π§", f"[cyan]To:[/cyan] {result['to']}") | |
| table.add_row("π", f"[cyan]Subject:[/cyan] {result['subject']}") | |
| table.add_row("π¬", f"[cyan]Message:[/cyan] {result['message']}") | |
| return Panel(table, title="π¨ Email Sent", title_align="left", border_style="green") | |
| elif tool_name == "remember": | |
| table = Table(show_header=False, box=None, padding=(0, 1)) | |
| table.add_row("π€", f"[cyan]Customer:[/cyan] {result['email']}") | |
| table.add_row("π", f"[cyan]Rule:[/cyan] {result['rule']}") | |
| return Panel(table, title="π§ Rule Created", title_align="left", border_style="blue") | |
| elif tool_name == "issue_invoice": | |
| table = Table(show_header=False, box=None, padding=(0, 1)) | |
| table.add_row("π", f"[cyan]Invoice ID:[/cyan] {result['id']}") | |
| table.add_row("π€", f"[cyan]Customer:[/cyan] {result['email']}") | |
| table.add_row("ποΈ", f"[cyan]Products:[/cyan] {', '.join(result['skus'])}") | |
| table.add_row("π°", f"[cyan]Total:[/cyan] {format_currency(result['total'])}") | |
| if result['discount_percent'] > 0: | |
| table.add_row("π―", | |
| f"[cyan]Discount:[/cyan] {result['discount_percent']}% ({format_currency(result['discount_amount'])})") | |
| table.add_row("π", f"[cyan]File:[/cyan] {result['file']}") | |
| return Panel(table, title="π§Ύ Invoice Generated", title_align="left", border_style="green") | |
| elif tool_name == "get_customer_data": | |
| tree = Tree("π€ Customer Data") | |
| rules_node = tree.add("π Rules") | |
| for rule in result.get('rules', []): | |
| rules_node.add(f"β’ {rule['rule']}") | |
| invoices_node = tree.add("π§Ύ Invoices") | |
| for inv_id, invoice in result.get('invoices', []): | |
| status = "β VOID" if invoice.get('void') else "β Active" | |
| invoices_node.add(f"β’ {inv_id}: {format_currency(invoice['total'])} {status}") | |
| emails_node = tree.add("π§ Emails") | |
| for email in result.get('emails', []): | |
| emails_node.add(f"β’ {email['subject']}") | |
| return Panel(tree, title="π Customer Profile", title_align="left", border_style="cyan") | |
| return Panel(str(result), title=f"π§ {tool_name.replace('_', ' ').title()}", | |
| title_align="left", border_style="white") | |
| def dispatch(cmd: BaseModel): | |
| if isinstance(cmd, SendEmail): | |
| email = { | |
| "to": cmd.recipient_email, | |
| "subject": cmd.subject, | |
| "message": cmd.message, | |
| } | |
| DB["emails"].append(email) | |
| return email | |
| if isinstance(cmd, CreateRule): | |
| rule = { | |
| "email": cmd.email, | |
| "rule": cmd.rule, | |
| } | |
| DB["rules"].append(rule) | |
| return rule | |
| if isinstance(cmd, GetCustomerData): | |
| addr = cmd.email | |
| return { | |
| "rules": [r for r in DB["rules"] if r["email"] == addr], | |
| "invoices": [t for t in DB["invoices"].items() if t[1]["email"] == addr], | |
| "emails": [e for e in DB["emails"] if e.get("to") == addr], | |
| } | |
| if isinstance(cmd, IssueInvoice): | |
| total = 0.0 | |
| for sku in cmd.skus: | |
| product = DB["products"].get(sku) | |
| if not product: | |
| return f"Product {sku} not found" | |
| total += product["price"] | |
| discount = round(total * 1.0 * cmd.discount_percent / 100.0, 2) | |
| invoice_id = f"INV-{len(DB['invoices']) + 1}" | |
| invoice = { | |
| "id": invoice_id, | |
| "email": cmd.email, | |
| "file": f"/invoices/{invoice_id}.pdf", | |
| "skus": cmd.skus, | |
| "discount_amount": discount, | |
| "discount_percent": cmd.discount_percent, | |
| "total": total - discount, | |
| "void": False, | |
| } | |
| DB["invoices"][invoice_id] = invoice | |
| return invoice | |
| if isinstance(cmd, VoidInvoice): | |
| invoice = DB["invoices"].get(cmd.invoice_id) | |
| if not invoice: | |
| return f"Invoice {cmd.invoice_id} not found" | |
| invoice["void"] = True | |
| return invoice | |
| TASKS = [ | |
| "Rule: address [email protected] as 'The SAMA', always give him 5% discount.", | |
| "Rule for [email protected]: Email his invoices to [email protected]", | |
| "[email protected] wants one of each product. Email him the invoice", | |
| "[email protected] wants 2x of what [email protected] got. Send invoice", | |
| "redo last [email protected] invoice: use 3x discount of [email protected]", | |
| ] | |
| system_prompt = f""" | |
| You are a business assistant helping Rinat Abdullin with customer interactions. | |
| - Clearly report when tasks are done. | |
| - Always send customers emails after issuing invoices (with invoice attached). | |
| - Be laconic. Especially in emails | |
| - No need to wait for payment confirmation before proceeding. | |
| - Always check customer data before issuing invoices or making changes. | |
| Products: {DB["products"]}""".strip() | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| console = Console() | |
| print = console.print | |
| reporter = MarkdownReporter() | |
| def print_header(): | |
| header_text = Text("π€ Schema-Guided Reasoning Agent", style="bold magenta") | |
| subtitle_text = Text("Business Automation & Customer Management", style="italic cyan") | |
| print(Panel( | |
| Align.center(header_text + "\n" + subtitle_text), | |
| border_style="magenta", | |
| padding=(1, 2) | |
| )) | |
| def print_products_table(): | |
| table = Table(title="π¦ Available Products", show_header=True, header_style="bold cyan") | |
| table.add_column("SKU", style="yellow", no_wrap=True) | |
| table.add_column("Product Name", style="green") | |
| table.add_column("Price", style="magenta", justify="right") | |
| for sku, product in DB["products"].items(): | |
| table.add_row(sku, product["name"], format_currency(product["price"])) | |
| print(table) | |
| print() | |
| def execute_tasks(): | |
| print_header() | |
| print_products_table() | |
| for task_idx, task in enumerate(TASKS, 1): | |
| task_header = Text(f"Task {task_idx}", style="bold white on blue") | |
| task_content = Text(task, style="bold white") | |
| print(Panel( | |
| task_header + "\n" + task_content, | |
| title="π― New Mission", | |
| title_align="left", | |
| border_style="blue", | |
| padding=(1, 2) | |
| )) | |
| # Log task to markdown | |
| reporter.add_task_header(task_idx, task) | |
| log = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": task} | |
| ] | |
| steps_completed = [] | |
| for i in range(20): | |
| step = f"step_{i + 1}" | |
| step_text = Text(f"π§ Planning {step}...", style="bold yellow") | |
| print(step_text) | |
| time.sleep(0.5) | |
| try: | |
| completion = client.beta.chat.completions.parse( | |
| model="gpt-4o", | |
| response_format=NextStep, | |
| messages=log, | |
| max_completion_tokens=10000, | |
| ) | |
| job = completion.choices[0].message.parsed | |
| except Exception as e: | |
| print(Panel(f"Error: {str(e)}", title="API Error", border_style="red")) | |
| break | |
| if isinstance(job.function, ReportTaskCompletion): | |
| status_color = "green" if job.function.code == "completed" else "red" | |
| status_icon = "β " if job.function.code == "completed" else "β" | |
| print(Panel( | |
| f"{status_icon} Task {job.function.code.upper()}", | |
| style=f"bold {status_color}", | |
| border_style=status_color | |
| )) | |
| steps_table = Table(title="π Completed Steps", show_header=False) | |
| steps_table.add_column("Step", style="cyan") | |
| for step_desc in job.function.completed_steps_laconic: | |
| steps_table.add_row(f"β {step_desc}") | |
| print(steps_table) | |
| print(Rule(style="dim")) | |
| # Log completion to markdown | |
| reporter.add_task_completion(job.function.code, job.function.completed_steps_laconic) | |
| break | |
| next_step = job.plan_remaining_steps_brief[0] | |
| print(Panel( | |
| f"π― [bold cyan]Next Action:[/bold cyan] {next_step}", | |
| border_style="cyan" | |
| )) | |
| log.append({ | |
| "role": "assistant", | |
| "content": next_step, | |
| "tool_calls": [{ | |
| "type": "function", | |
| "id": step, | |
| "function": { | |
| "name": job.function.tool, | |
| "arguments": job.function.model_dump_json(), | |
| }}] | |
| }) | |
| result = dispatch(job.function) | |
| # Log step to markdown | |
| reporter.add_step(i + 1, next_step, {"tool": job.function.tool}, result) | |
| formatted_result = format_tool_result(result, job.function.tool) | |
| print(formatted_result) | |
| txt = result if isinstance(result, str) else json.dumps(result) | |
| log.append({"role": "tool", "content": txt, "tool_call_id": step}) | |
| steps_completed.append(next_step) | |
| print() | |
| print("\n" + "=" * 80 + "\n") | |
| # Save the markdown report | |
| filename = reporter.save_report() | |
| print(Panel( | |
| f"π Report saved to: {filename}", | |
| title="Report Generated", | |
| border_style="green" | |
| )) | |
| if __name__ == "__main__": | |
| execute_tasks() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment