# /// script # dependencies = [ # "dspy", # "rich" # ] # /// import dspy import os import inspect import subprocess import argparse from typing import Any, Dict, Optional # Core business logic def list_files(path: str): """List files in a directory""" try: return os.listdir(path) except FileNotFoundError: return f"Error: Directory '{path}' does not exist" except PermissionError: return f"Error: Permission denied accessing directory '{path}'" except NotADirectoryError: return f"Error: '{path}' is not a directory" except Exception as e: return f"Error listing files in '{path}': {str(e)}" def read_file(path: str): """Read a file""" try: with open(path, 'r') as f: return f.read() except FileNotFoundError: return f"Error: File '{path}' does not exist" except PermissionError: return f"Error: Permission denied reading file '{path}'" except IsADirectoryError: return f"Error: '{path}' is a directory, not a file" except UnicodeDecodeError: return f"Error: Cannot decode file '{path}' - it may be binary or have wrong encoding" except Exception as e: return f"Error reading file '{path}': {str(e)}" def write_file(path: str, content: str): """Write content to a file""" try: with open(path, 'w') as f: f.write(content) return "Content written successfully" except PermissionError: return f"Error: Permission denied writing to file '{path}'" except IsADirectoryError: return f"Error: '{path}' is a directory, cannot write to it" except FileNotFoundError: return f"Error: Directory containing '{path}' does not exist" except OSError as e: if e.errno == 28: # No space left return f"Error: No space left on device when writing to '{path}'" return f"Error: OS error writing to '{path}': {str(e)}" except Exception as e: return f"Error writing to file '{path}': {str(e)}" def run_cmd(cmd: str): """Run a command in the shell and return the output.""" try: result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30) if result.returncode == 0: return result.stdout.strip() if result.stdout.strip() else "Command executed successfully (no output)" else: error_msg = result.stderr.strip() if result.stderr.strip() else "Unknown error" return f"Command failed (exit code {result.returncode}): {error_msg}" except subprocess.TimeoutExpired: return f"Error: Command '{cmd}' timed out after 30 seconds" except FileNotFoundError: return f"Error: Command not found - '{cmd.split()[0] if cmd.split() else cmd}'" except PermissionError: return f"Error: Permission denied executing command '{cmd}'" except Exception as e: return f"Error executing command '{cmd}': {str(e)}" def finish(answer: str): """Conclude the trajectory and return the final answer.""" return answer def fn_metadata(func): sig = inspect.signature(func) doc = inspect.getdoc(func) or "No docstring." return dict(function_name=func.__name__, arguments=str(sig), docstring=doc) # UI functions for clean display from rich.console import Console console = Console() def confirm_cmd(cmd: str) -> bool: console.print() console.print("│ Execute command:", style="dim") console.print(f"│ [bold cyan]$ {cmd}[/bold cyan]") resp = console.input("│ Continue? [dim](y/n)[/dim] ") return resp.lower() == 'y' def show_step(item: Dict[str, Any]): fn = item['selected_fn'] args = item['args'] # Function call line console.print(f"\n[dim]→[/dim] [bold]{fn}[/bold]", end="") # Show args inline if simple, otherwise on new line if args: args_str = ', '.join(f"{k}={repr(v)[:50]}" for k, v in args.items()) if len(args_str) < 40: console.print(f"[dim]({args_str})[/dim]") else: console.print() for k, v in args.items(): val_str = repr(v) if len(val_str) > 60: val_str = val_str[:57] + "..." console.print(f" [dim]{k}:[/dim] {val_str}") else: console.print() # Show full reasoning (no truncation) if fn != "finish": reason = item['reasoning'] lines = reason.split('\n') for line in lines: if line.strip(): console.print(f" [dim italic]{line}[/dim italic]") # Show output if it's short and relevant output = str(item.get('fn_output', '')) if output and len(output) < 100 and fn != "finish": console.print(f" [dim]↳[/dim] [green]{output[:80]}[/green]") def get_input() -> Optional[str]: console.print() msg = console.input("[bold]>[/bold] ") return None if msg.lower() in ["exit", "quit", "q"] else msg def show_answer(answer: str): console.print() console.print("─" * 40, style="dim") if len(answer) < 100: console.print(answer) else: lines = answer.split('\n') for line in lines: if line.strip(): console.print(line) console.print("─" * 40, style="dim") def show_prompt(): console.clear() console.print("[bold]DSPy Agent CLI[/bold] [dim]• Type 'exit' to quit[/dim]\n") def main(): # Parse command line arguments parser = argparse.ArgumentParser(description='DSPy Agent CLI') parser.add_argument('--max-steps', type=int, default=10, help='Maximum number of steps for the agent (default: 10)') args = parser.parse_args() # Setup tools = { "list_files": list_files, "read_file": read_file, "write_file": write_file, "run_cmd": run_cmd, "finish": finish } tools_metadata = {nm: fn_metadata(fn) for nm, fn in tools.items()} lm = dspy.LM("groq/moonshotai/kimi-k2-instruct", api_key=os.environ["GROQ_API_KEY"]) dspy.configure(lm=lm) sig = dspy.Signature('question, trajectory, functions, history -> next_selected_fn, args: dict[str, Any]') react = dspy.ChainOfThought(sig) max_steps = args.max_steps # Main loop show_prompt() history = dspy.History(messages=[]) while True: msg = get_input() if msg is None: break # Agent logic moved into main loop traj = [] for _ in range(max_steps): pred = react(question=msg, trajectory=traj, functions=tools_metadata, history=history) fn_name = pred.next_selected_fn.strip('"').strip("'") # Handle command confirmation if fn_name == "run_cmd": if not confirm_cmd(pred.args['cmd']): output = "Command execution cancelled by user" else: output = tools[fn_name](**pred.args) else: output = tools[fn_name](**pred.args) item = dict(reasoning=pred.reasoning, selected_fn=fn_name, args=pred.args, fn_output=output) traj.append(item) show_step(item) if fn_name == "finish": break history.messages.append({"message": msg, "answer": output}) show_answer(output) if __name__ == "__main__": main()