Last active
October 25, 2025 18:14
-
-
Save sandeepkunkunuru/b879f8a5c00a66f29d63b47fda0b0c3e to your computer and use it in GitHub Desktop.
Revisions
-
sandeepkunkunuru revised this gist
Oct 25, 2025 . 1 changed file with 14 additions and 7 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -681,15 +681,23 @@ def main(): org_dir = ORGS[org] org_dir.mkdir(parents=True, exist_ok=True) # Track stats per repo (not per org) repo_stats = {} # {repo_name: {'found': count, 'reviewed': count}} for pr in prs: repo_name = pr['_parsed_repo'] # Initialize repo stats if first time seeing this repo if repo_name not in repo_stats: repo_stats[repo_name] = {'found': 0, 'reviewed': 0} repo_stats[repo_name]['found'] += 1 result = review_pr(pr, org_dir) if result['reviewed']: stats['reviewed'] += 1 repo_stats[repo_name]['reviewed'] += 1 if result['decision'] == 'APPROVE': stats['approved'] += 1 @@ -704,10 +712,9 @@ def main(): else: stats['errors'] += 1 # Update repo state for each unique repo for repo_name, counts in repo_stats.items(): update_repo_state(org, repo_name, counts['found'], counts['reviewed']) # Record automation run duration = int(time.time() - run_start) -
sandeepkunkunuru revised this gist
Oct 25, 2025 . 1 changed file with 14 additions and 7 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -366,6 +366,12 @@ def review_with_claude(pr_data: Dict, diff: str, pr_info: Dict) -> Tuple[Optiona # Parse org/repo from nameWithOwner repo_full = pr_data['repository']['nameWithOwner'] # Filter reviews to only show ones for the current commit being reviewed # This prevents Claude from re-reporting issues that were fixed in newer commits current_commit_sha = pr_info.get('headRefOid') current_reviews = [r for r in pr_info.get('reviews', []) if r.get('commit', {}).get('oid') == current_commit_sha] # Create review context context = f"""# PR Review Context @@ -377,8 +383,8 @@ def review_with_claude(pr_data: Dict, diff: str, pr_info: Dict) -> Tuple[Optiona ## Description {pr_info.get('body', 'No description provided')} ## Reviews (for current commit {current_commit_sha[:8] if current_commit_sha else 'unknown'} only) {json.dumps(current_reviews, indent=2)} ## Comments {json.dumps(pr_info.get('comments', []), indent=2)} @@ -514,7 +520,7 @@ def review_pr(pr_data: Dict, org_dir: Path) -> Dict: # Sync repo if not sync_repo(org, repo, org_dir): log_error(f"Failed to sync repo {org}/{repo}") record_review(org, repo, pr_number, "unknown", "ERROR", pr_data, "Failed to sync repo", duration=int(time.time() - start_time)) return result @@ -528,7 +534,8 @@ def review_pr(pr_data: Dict, org_dir: Path) -> Dict: run_command(["gh", "pr", "checkout", str(pr_number)]) except Exception as e: log_error(f"Failed to checkout PR #{pr_number}: {e}") # Note: commit_sha is not available yet at this point record_review(org, repo, pr_number, "unknown", "ERROR", pr_data, f"Failed to checkout: {e}", duration=int(time.time() - start_time)) return result @@ -697,9 +704,9 @@ def main(): else: stats['errors'] += 1 # Update repo state for each repo we processed for pr in prs: repo = pr['_parsed_repo'] update_repo_state(org, repo, found_count, reviewed_count) # Record automation run -
sandeepkunkunuru revised this gist
Oct 25, 2025 . 1 changed file with 0 additions and 702 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,702 +0,0 @@ -
sandeepkunkunuru revised this gist
Oct 25, 2025 . 1 changed file with 702 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,702 @@ #!/usr/bin/env python3 """ Optimized PR Review Automation Fast, cheap, and smart PR review automation using state tracking """ import json import subprocess import sqlite3 import sys import tempfile from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple import time import os # Configuration BASE_DIR = Path("/Users/user/projects") DB_PATH = BASE_DIR / "pr-reviews.db" LOG_PATH = BASE_DIR / "pr-review.log" ERROR_LOG_PATH = BASE_DIR / "pr-review-error.log" ORGS = { "VaidhyaMegha": BASE_DIR / "VaidhyaMegha", "Profintech-Technologies": BASE_DIR / "ProFinTech" } # Staleness thresholds MAX_PR_AGE_DAYS = 60 MAX_INACTIVITY_DAYS = 21 # Colors for output class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' CYAN = '\033[0;36m' NC = '\033[0m' def log(level: str, message: str, color: str = Colors.NC): """Log message to console and file""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted = f"{color}[{level}]{Colors.NC} {message}" plain = f"[{level}] {message}" print(formatted) # Append to log file with open(LOG_PATH, 'a') as f: f.write(f"{timestamp} {plain}\n") def log_error(message: str): """Log error to both logs""" log("ERROR", message, Colors.RED) with open(ERROR_LOG_PATH, 'a') as f: f.write(f"{datetime.now().isoformat()} [ERROR] {message}\n") def run_command(cmd: List[str], check: bool = True) -> Optional[str]: """Run shell command and return output""" try: result = subprocess.run( cmd, capture_output=True, text=True, check=check ) return result.stdout.strip() except subprocess.CalledProcessError as e: if check: log_error(f"Command failed: {' '.join(cmd)}\n{e.stderr}") return None except Exception as e: log_error(f"Command error: {' '.join(cmd)}\n{str(e)}") return None def init_database(): """Initialize or update database schema""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create repo_state table for tracking cursor.execute(""" CREATE TABLE IF NOT EXISTS repo_state ( org TEXT NOT NULL, repo TEXT NOT NULL, last_checked_at TEXT, last_pr_found_at TEXT, total_prs_seen INTEGER DEFAULT 0, total_prs_reviewed INTEGER DEFAULT 0, active_score REAL DEFAULT 1.0, PRIMARY KEY (org, repo) ) """) # Create reviewed_commits table for fast lookup cursor.execute(""" CREATE TABLE IF NOT EXISTS reviewed_commits ( org TEXT NOT NULL, repo TEXT NOT NULL, pr_number INTEGER NOT NULL, commit_sha TEXT NOT NULL, reviewed_at TEXT NOT NULL, decision TEXT, PRIMARY KEY (org, repo, pr_number, commit_sha) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_reviewed_commits_sha ON reviewed_commits(org, repo, commit_sha) """) conn.commit() conn.close() log("INFO", "Database initialized", Colors.GREEN) def check_prerequisites() -> bool: """Check if all required tools are installed""" log("INFO", "Checking prerequisites...", Colors.BLUE) tools = { "gh": "GitHub CLI (brew install gh)", "git": "Git", "claude": "Claude CLI (https://claude.com/download)" } for tool, install_msg in tools.items(): if not run_command(["command", "-v", tool], check=False): log_error(f"{tool} not found. Install: {install_msg}") return False # Check GH auth if run_command(["gh", "auth", "status"], check=False) is None: log_error("Not authenticated with GitHub. Run: gh auth login") return False log("INFO", "All prerequisites met", Colors.GREEN) return True def get_current_user() -> str: """Get current GitHub username""" output = run_command(["gh", "api", "user", "-q", ".login"]) return output if output else "unknown" def is_pr_stale(created_at: str, updated_at: str) -> Tuple[bool, Optional[str]]: """Check if PR is stale""" try: created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) updated = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) now = datetime.now(created.tzinfo) age_days = (now - created).days inactive_days = (now - updated).days if age_days > MAX_PR_AGE_DAYS: return True, f"PR is {age_days} days old (max: {MAX_PR_AGE_DAYS})" if inactive_days > MAX_INACTIVITY_DAYS: return True, f"Inactive for {inactive_days} days (max: {MAX_INACTIVITY_DAYS})" return False, None except Exception as e: log_error(f"Error checking staleness: {e}") return False, None def already_reviewed(org: str, repo: str, pr_number: int, commit_sha: str) -> bool: """Check if we've already reviewed this specific commit""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT 1 FROM reviewed_commits WHERE org = ? AND repo = ? AND pr_number = ? AND commit_sha = ? """, (org, repo, pr_number, commit_sha)) exists = cursor.fetchone() is not None conn.close() return exists def record_review(org: str, repo: str, pr_number: int, commit_sha: str, decision: str, pr_data: Dict, feedback: str = None, merged: bool = False, duration: int = 0): """Record a review in the database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() now = datetime.now().isoformat() # Record in reviewed_commits for fast lookup cursor.execute(""" INSERT OR REPLACE INTO reviewed_commits (org, repo, pr_number, commit_sha, reviewed_at, decision) VALUES (?, ?, ?, ?, ?, ?) """, (org, repo, pr_number, commit_sha, now, decision)) # Record full details in pr_reviews author = pr_data.get('author', {}).get('login') if isinstance(pr_data.get('author'), dict) else pr_data.get('author') cursor.execute(""" INSERT INTO pr_reviews (review_timestamp, org, repo, pr_number, pr_title, pr_author, pr_url, pr_created_at, pr_updated_at, commit_sha, decision, feedback, merged, merge_timestamp, run_duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( now, org, repo, pr_number, pr_data.get('title'), author, pr_data.get('url'), pr_data.get('createdAt'), pr_data.get('updatedAt'), commit_sha, decision, feedback, 1 if merged else 0, now if merged else None, duration )) conn.commit() conn.close() def update_repo_state(org: str, repo: str, found_prs: int, reviewed_prs: int): """Update repository state tracking""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT INTO repo_state (org, repo, last_checked_at, last_pr_found_at, total_prs_seen, total_prs_reviewed, active_score) VALUES (?, ?, ?, ?, ?, ?, 1.0) ON CONFLICT(org, repo) DO UPDATE SET last_checked_at = ?, last_pr_found_at = CASE WHEN ? > 0 THEN ? ELSE last_pr_found_at END, total_prs_seen = total_prs_seen + ?, total_prs_reviewed = total_prs_reviewed + ?, active_score = CASE WHEN ? > 0 THEN MIN(active_score * 1.1, 2.0) ELSE MAX(active_score * 0.9, 0.1) END """, (org, repo, now, now if found_prs > 0 else None, found_prs, reviewed_prs, now, found_prs, now, found_prs, reviewed_prs, found_prs)) conn.commit() conn.close() def fetch_all_review_prs() -> List[Dict]: """Fetch all PRs where current user is requested as reviewer using global search""" log("INFO", "Fetching PRs using global search (fast method)...", Colors.CYAN) # Use GitHub's search API via gh CLI - ONE API CALL instead of looping repos! cmd = [ "gh", "search", "prs", "--review-requested", "@me", "--state", "open", "--json", "number,title,repository,author,createdAt,updatedAt,url", "--limit", "100" ] output = run_command(cmd) if not output: log("WARNING", "No PRs found or search failed", Colors.YELLOW) return [] try: prs = json.loads(output) log("INFO", f"Found {len(prs)} PRs across all orgs", Colors.GREEN) return prs except json.JSONDecodeError as e: log_error(f"Failed to parse PR search results: {e}") return [] def sync_repo(org: str, repo: str, repo_dir: Path) -> bool: """Clone or update repository""" repo_path = repo_dir / repo try: if repo_path.exists(): log("INFO", f"Updating repo: {org}/{repo}", Colors.BLUE) os.chdir(repo_path) run_command(["git", "fetch", "--all", "--prune"]) else: log("INFO", f"Cloning repo: {org}/{repo}", Colors.BLUE) repo_dir.mkdir(parents=True, exist_ok=True) os.chdir(repo_dir) run_command(["gh", "repo", "clone", f"{org}/{repo}"]) os.chdir(repo) return True except Exception as e: log_error(f"Failed to sync repo {org}/{repo}: {e}") return False def get_pr_diff(pr_number: int, base_branch: str) -> Optional[str]: """Get PR diff""" try: # Fetch base branch run_command(["git", "fetch", "origin", base_branch], check=False) # Get diff diff = run_command(["git", "diff", f"origin/{base_branch}...HEAD"]) return diff except Exception as e: log_error(f"Failed to get diff for PR #{pr_number}: {e}") return None def review_with_claude(pr_data: Dict, diff: str, pr_info: Dict) -> Tuple[Optional[str], Optional[str]]: """Use Claude to review the PR""" # Parse org/repo from nameWithOwner repo_full = pr_data['repository']['nameWithOwner'] # Filter reviews to only show ones for the current commit being reviewed # This prevents Claude from re-reporting issues that were fixed in newer commits current_commit_sha = pr_info.get('headRefOid') current_reviews = [r for r in pr_info.get('reviews', []) if r.get('commit', {}).get('oid') == current_commit_sha] # Create review context context = f"""# PR Review Context ## Repository: {repo_full} ## PR #{pr_data['number']}: {pr_data['title']} ## Author: @{pr_data['author']['login']} ## URL: {pr_data['url']} ## Description {pr_info.get('body', 'No description provided')} ## Reviews (for current commit {current_commit_sha[:8] if current_commit_sha else 'unknown'} only) {json.dumps(current_reviews, indent=2)} ## Comments {json.dumps(pr_info.get('comments', []), indent=2)} ## Diff Summary {run_command(['git', 'diff', '--stat', f"origin/{pr_info.get('baseRefName', 'main')}...HEAD"]) or 'N/A'} ## Full Diff ```diff {diff} ``` --- Please review this PR thoroughly: 1. Check code quality, style, and best practices 2. Look for potential bugs or security issues 3. Verify if tests are included and appropriate 4. Review the conversation history for any concerns 5. Provide a decision: APPROVE or REQUEST_CHANGES 6. If requesting changes, provide specific, actionable feedback Format your response as: DECISION: [APPROVE|REQUEST_CHANGES] FEEDBACK: [Your detailed feedback here] """ # Write to temp file with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(context) temp_path = f.name try: log("INFO", "Running Claude review...", Colors.CYAN) result = run_command(["claude", "-p", context], check=False) if not result: return None, None # Parse decision decision_line = [line for line in result.split('\n') if 'DECISION:' in line.upper()] decision = None if decision_line: decision = decision_line[0].split(':', 1)[1].strip().replace('*', '').upper() # Parse feedback feedback_parts = result.split('FEEDBACK:', 1) feedback = feedback_parts[1].strip() if len(feedback_parts) > 1 else result return decision, feedback finally: Path(temp_path).unlink(missing_ok=True) def submit_review(org: str, repo: str, pr_number: int, decision: str, feedback: str) -> bool: """Submit review to GitHub""" try: if decision == "APPROVE": run_command(["gh", "pr", "review", str(pr_number), "--repo", f"{org}/{repo}", "--approve", "--body", feedback]) return True elif "REQUEST" in decision: run_command(["gh", "pr", "review", str(pr_number), "--repo", f"{org}/{repo}", "--request-changes", "--body", feedback]) return True return False except Exception as e: log_error(f"Failed to submit review: {e}") return False def merge_pr(org: str, repo: str, pr_number: int) -> bool: """Merge PR with squash""" try: log("INFO", f"Attempting to merge PR #{pr_number}...", Colors.CYAN) result = run_command([ "gh", "pr", "merge", str(pr_number), "--repo", f"{org}/{repo}", "--squash", "--delete-branch" ], check=False) if result is not None: log("SUCCESS", f"PR #{pr_number} merged successfully!", Colors.GREEN) return True else: log("WARNING", f"Failed to merge PR #{pr_number}", Colors.YELLOW) return False except Exception as e: log_error(f"Error merging PR #{pr_number}: {e}") return False def review_pr(pr_data: Dict, org_dir: Path) -> Dict: """Review a single PR""" # Use parsed org/repo from main() org = pr_data.get('_parsed_org') repo = pr_data.get('_parsed_repo') # Fallback if not parsed yet if not org or not repo: repo_full = pr_data['repository']['nameWithOwner'] org, repo = repo_full.split('/', 1) pr_number = pr_data['number'] start_time = time.time() result = { 'org': org, 'repo': repo, 'pr_number': pr_number, 'reviewed': False, 'decision': None, 'merged': False } log("INFO", "=" * 60, Colors.BLUE) log("INFO", f"Processing PR #{pr_number} in {org}/{repo}", Colors.BLUE) log("INFO", f"Title: {pr_data['title']}", Colors.BLUE) log("INFO", "=" * 60, Colors.BLUE) # Check staleness first (before fetching more data) is_stale, stale_reason = is_pr_stale(pr_data['createdAt'], pr_data['updatedAt']) if is_stale: log("WARNING", f"Skipping stale PR: {stale_reason}", Colors.YELLOW) # Use a placeholder commit_sha for skipped stale PRs record_review(org, repo, pr_number, "stale", "SKIP_STALE", pr_data, stale_reason, duration=int(time.time() - start_time)) return result # Sync repo if not sync_repo(org, repo, org_dir): log_error(f"Failed to sync repo {org}/{repo}") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Failed to sync repo", duration=int(time.time() - start_time)) return result repo_path = org_dir / repo os.chdir(repo_path) # Stash and checkout PR run_command(["git", "stash", "push", "-u", "-m", "Auto-stash"], check=False) try: run_command(["gh", "pr", "checkout", str(pr_number)]) except Exception as e: log_error(f"Failed to checkout PR #{pr_number}: {e}") # Note: commit_sha is not available yet at this point record_review(org, repo, pr_number, "unknown", "ERROR", pr_data, f"Failed to checkout: {e}", duration=int(time.time() - start_time)) return result # Get PR details including commit SHA pr_info_json = run_command([ "gh", "pr", "view", str(pr_number), "--json", "title,body,author,baseRefName,reviews,comments,headRefOid" ]) if not pr_info_json: log_error("Failed to get PR details") return result pr_info = json.loads(pr_info_json) base_branch = pr_info.get('baseRefName', 'main') commit_sha = pr_info.get('headRefOid', 'unknown') # CRITICAL: Ensure we're on the latest commit of the PR log("INFO", f"Ensuring we're on the latest commit: {commit_sha[:8]}...", Colors.CYAN) try: run_command(["git", "fetch", "origin", f"pull/{pr_number}/head"]) run_command(["git", "reset", "--hard", commit_sha]) # Verify we're on the correct commit current_sha = run_command(["git", "rev-parse", "HEAD"]) if current_sha != commit_sha: log_error(f"Failed to checkout correct commit. Expected {commit_sha[:8]}, got {current_sha[:8]}") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Failed to checkout correct commit", duration=int(time.time() - start_time)) return result log("SUCCESS", f"Verified on correct commit: {commit_sha[:8]}", Colors.GREEN) except Exception as e: log_error(f"Failed to ensure latest commit: {e}") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, f"Failed to ensure latest commit: {e}", duration=int(time.time() - start_time)) return result # Now check if we've already reviewed this specific commit if already_reviewed(org, repo, pr_number, commit_sha): log("INFO", f"Already reviewed commit {commit_sha[:8]} - skipping", Colors.YELLOW) record_review(org, repo, pr_number, commit_sha, "SKIP_ALREADY_REVIEWED", pr_data, "Already reviewed this commit", duration=int(time.time() - start_time)) return result # Get diff diff = get_pr_diff(pr_number, base_branch) if not diff: log_error("Failed to get PR diff") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Failed to get diff", duration=int(time.time() - start_time)) return result # Review with Claude decision, feedback = review_with_claude(pr_data, diff, pr_info) if not decision: log_error("Failed to get decision from Claude") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Claude review failed", duration=int(time.time() - start_time)) return result log("INFO", f"Claude's decision: {decision}", Colors.CYAN) result['decision'] = decision result['reviewed'] = True # Submit review if not submit_review(org, repo, pr_number, decision, feedback): log_error("Failed to submit review") return result # Merge if approved merged = False if decision == "APPROVE": merged = merge_pr(org, repo, pr_number) result['merged'] = merged # Record in database duration = int(time.time() - start_time) record_review(org, repo, pr_number, commit_sha, decision, pr_data, feedback, merged, duration) log("SUCCESS", f"Completed review of PR #{pr_number} in {duration}s", Colors.GREEN) # Cleanup run_command(["git", "checkout", base_branch], check=False) run_command(["git", "stash", "pop"], check=False) return result def main(): """Main execution""" log("INFO", "Starting Optimized PR Review Automation", Colors.CYAN) log("INFO", "=" * 60, Colors.CYAN) run_start = time.time() # Initialize if not check_prerequisites(): sys.exit(1) init_database() # Fetch all PRs using global search (FAST!) all_prs = fetch_all_review_prs() if not all_prs: log("INFO", "No PRs to review", Colors.GREEN) return # Group by org for processing prs_by_org = {} for pr in all_prs: # Parse org/repo from nameWithOwner field repo_full = pr['repository']['nameWithOwner'] org, repo_name = repo_full.split('/', 1) if org in ORGS: if org not in prs_by_org: prs_by_org[org] = [] # Add parsed org and repo to PR data pr['_parsed_org'] = org pr['_parsed_repo'] = repo_name prs_by_org[org].append(pr) # Stats stats = { 'total_found': len(all_prs), 'reviewed': 0, 'approved': 0, 'changes_requested': 0, 'merged': 0, 'skipped': 0, 'errors': 0 } # Process each org for org, prs in prs_by_org.items(): log("INFO", f"\nProcessing {len(prs)} PRs for {org}", Colors.BLUE) org_dir = ORGS[org] org_dir.mkdir(parents=True, exist_ok=True) reviewed_count = 0 found_count = len(prs) for pr in prs: result = review_pr(pr, org_dir) if result['reviewed']: stats['reviewed'] += 1 reviewed_count += 1 if result['decision'] == 'APPROVE': stats['approved'] += 1 elif 'REQUEST' in result['decision']: stats['changes_requested'] += 1 if result['merged']: stats['merged'] += 1 else: if result.get('decision') and 'SKIP' in result['decision']: stats['skipped'] += 1 else: stats['errors'] += 1 # Update repo state repo = pr['repository']['name'] update_repo_state(org, repo, found_count, reviewed_count) # Record automation run duration = int(time.time() - run_start) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" INSERT INTO automation_runs (run_timestamp, total_prs_found, prs_reviewed, prs_approved, prs_changes_requested, prs_merged, prs_skipped, errors, run_duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (datetime.now().isoformat(), stats['total_found'], stats['reviewed'], stats['approved'], stats['changes_requested'], stats['merged'], stats['skipped'], stats['errors'], duration)) conn.commit() conn.close() # Print summary log("INFO", "\n" + "=" * 60, Colors.CYAN) log("SUCCESS", "PR Review Automation Completed!", Colors.GREEN) log("INFO", "=" * 60, Colors.CYAN) log("INFO", f"Total PRs found: {stats['total_found']}", Colors.BLUE) log("INFO", f"Reviewed: {stats['reviewed']}", Colors.GREEN) log("INFO", f" - Approved: {stats['approved']}", Colors.GREEN) log("INFO", f" - Changes requested: {stats['changes_requested']}", Colors.YELLOW) log("INFO", f" - Merged: {stats['merged']}", Colors.GREEN) log("INFO", f"Skipped: {stats['skipped']}", Colors.YELLOW) log("INFO", f"Errors: {stats['errors']}", Colors.RED) log("INFO", f"Duration: {duration}s", Colors.BLUE) log("INFO", "=" * 60, Colors.CYAN) if __name__ == "__main__": try: main() except KeyboardInterrupt: log("WARNING", "\nInterrupted by user", Colors.YELLOW) sys.exit(130) except Exception as e: log_error(f"Fatal error: {e}") import traceback traceback.print_exc() sys.exit(1) -
sandeepkunkunuru revised this gist
Oct 25, 2025 . 1 changed file with 20 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -546,6 +546,26 @@ def review_pr(pr_data: Dict, org_dir: Path) -> Dict: base_branch = pr_info.get('baseRefName', 'main') commit_sha = pr_info.get('headRefOid', 'unknown') # CRITICAL: Ensure we're on the latest commit of the PR log("INFO", f"Ensuring we're on the latest commit: {commit_sha[:8]}...", Colors.CYAN) try: run_command(["git", "fetch", "origin", f"pull/{pr_number}/head"]) run_command(["git", "reset", "--hard", commit_sha]) # Verify we're on the correct commit current_sha = run_command(["git", "rev-parse", "HEAD"]) if current_sha != commit_sha: log_error(f"Failed to checkout correct commit. Expected {commit_sha[:8]}, got {current_sha[:8]}") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Failed to checkout correct commit", duration=int(time.time() - start_time)) return result log("SUCCESS", f"Verified on correct commit: {commit_sha[:8]}", Colors.GREEN) except Exception as e: log_error(f"Failed to ensure latest commit: {e}") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, f"Failed to ensure latest commit: {e}", duration=int(time.time() - start_time)) return result # Now check if we've already reviewed this specific commit if already_reviewed(org, repo, pr_number, commit_sha): log("INFO", f"Already reviewed commit {commit_sha[:8]} - skipping", Colors.YELLOW) -
sandeepkunkunuru created this gist
Oct 23, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,725 @@ #!/usr/bin/env python3 """ Optimized PR Review Automation Fast, cheap, and smart PR review automation using state tracking This script automatically reviews PRs where you're requested as a reviewer using: - GitHub CLI (gh) for PR management - Claude CLI for AI-powered code reviews - SQLite for state tracking and avoiding duplicate reviews - Smart staleness detection to skip old/inactive PRs """ import json import subprocess import sqlite3 import sys import tempfile from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Tuple import time import os # Configuration - UPDATE THESE FOR YOUR SETUP BASE_DIR = Path.home() / "pr-automation" # Where to store repos and database DB_PATH = BASE_DIR / "pr-reviews.db" LOG_PATH = BASE_DIR / "pr-review.log" ERROR_LOG_PATH = BASE_DIR / "pr-review-error.log" # Configure your organizations and where to clone their repos # Example: "YourOrgName": BASE_DIR / "YourOrgName" ORGS = { "your-org": BASE_DIR / "your-org", "another-org": BASE_DIR / "another-org" } # Staleness thresholds MAX_PR_AGE_DAYS = 60 MAX_INACTIVITY_DAYS = 21 # Colors for output class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' CYAN = '\033[0;36m' NC = '\033[0m' def log(level: str, message: str, color: str = Colors.NC): """Log message to console and file""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted = f"{color}[{level}]{Colors.NC} {message}" plain = f"[{level}] {message}" print(formatted) # Append to log file with open(LOG_PATH, 'a') as f: f.write(f"{timestamp} {plain}\n") def log_error(message: str): """Log error to both logs""" log("ERROR", message, Colors.RED) with open(ERROR_LOG_PATH, 'a') as f: f.write(f"{datetime.now().isoformat()} [ERROR] {message}\n") def run_command(cmd: List[str], check: bool = True) -> Optional[str]: """Run shell command and return output""" try: result = subprocess.run( cmd, capture_output=True, text=True, check=check ) return result.stdout.strip() except subprocess.CalledProcessError as e: if check: log_error(f"Command failed: {' '.join(cmd)}\n{e.stderr}") return None except Exception as e: log_error(f"Command error: {' '.join(cmd)}\n{str(e)}") return None def init_database(): """Initialize or update database schema""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create repo_state table for tracking cursor.execute(""" CREATE TABLE IF NOT EXISTS repo_state ( org TEXT NOT NULL, repo TEXT NOT NULL, last_checked_at TEXT, last_pr_found_at TEXT, total_prs_seen INTEGER DEFAULT 0, total_prs_reviewed INTEGER DEFAULT 0, active_score REAL DEFAULT 1.0, PRIMARY KEY (org, repo) ) """) # Create reviewed_commits table for fast lookup cursor.execute(""" CREATE TABLE IF NOT EXISTS reviewed_commits ( org TEXT NOT NULL, repo TEXT NOT NULL, pr_number INTEGER NOT NULL, commit_sha TEXT NOT NULL, reviewed_at TEXT NOT NULL, decision TEXT, PRIMARY KEY (org, repo, pr_number, commit_sha) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_reviewed_commits_sha ON reviewed_commits(org, repo, commit_sha) """) # Create pr_reviews table for full review history cursor.execute(""" CREATE TABLE IF NOT EXISTS pr_reviews ( id INTEGER PRIMARY KEY AUTOINCREMENT, review_timestamp TEXT NOT NULL, org TEXT NOT NULL, repo TEXT NOT NULL, pr_number INTEGER NOT NULL, pr_title TEXT, pr_author TEXT, pr_url TEXT, pr_created_at TEXT, pr_updated_at TEXT, commit_sha TEXT NOT NULL, decision TEXT NOT NULL, feedback TEXT, merged INTEGER DEFAULT 0, merge_timestamp TEXT, run_duration_seconds INTEGER ) """) # Create automation_runs table for tracking script runs cursor.execute(""" CREATE TABLE IF NOT EXISTS automation_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_timestamp TEXT NOT NULL, total_prs_found INTEGER, prs_reviewed INTEGER, prs_approved INTEGER, prs_changes_requested INTEGER, prs_merged INTEGER, prs_skipped INTEGER, errors INTEGER, run_duration_seconds INTEGER ) """) conn.commit() conn.close() log("INFO", "Database initialized", Colors.GREEN) def check_prerequisites() -> bool: """Check if all required tools are installed""" log("INFO", "Checking prerequisites...", Colors.BLUE) tools = { "gh": "GitHub CLI (brew install gh)", "git": "Git", "claude": "Claude CLI (https://claude.com/download)" } for tool, install_msg in tools.items(): if not run_command(["command", "-v", tool], check=False): log_error(f"{tool} not found. Install: {install_msg}") return False # Check GH auth if run_command(["gh", "auth", "status"], check=False) is None: log_error("Not authenticated with GitHub. Run: gh auth login") return False log("INFO", "All prerequisites met", Colors.GREEN) return True def get_current_user() -> str: """Get current GitHub username""" output = run_command(["gh", "api", "user", "-q", ".login"]) return output if output else "unknown" def is_pr_stale(created_at: str, updated_at: str) -> Tuple[bool, Optional[str]]: """Check if PR is stale""" try: created = datetime.fromisoformat(created_at.replace('Z', '+00:00')) updated = datetime.fromisoformat(updated_at.replace('Z', '+00:00')) now = datetime.now(created.tzinfo) age_days = (now - created).days inactive_days = (now - updated).days if age_days > MAX_PR_AGE_DAYS: return True, f"PR is {age_days} days old (max: {MAX_PR_AGE_DAYS})" if inactive_days > MAX_INACTIVITY_DAYS: return True, f"Inactive for {inactive_days} days (max: {MAX_INACTIVITY_DAYS})" return False, None except Exception as e: log_error(f"Error checking staleness: {e}") return False, None def already_reviewed(org: str, repo: str, pr_number: int, commit_sha: str) -> bool: """Check if we've already reviewed this specific commit""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" SELECT 1 FROM reviewed_commits WHERE org = ? AND repo = ? AND pr_number = ? AND commit_sha = ? """, (org, repo, pr_number, commit_sha)) exists = cursor.fetchone() is not None conn.close() return exists def record_review(org: str, repo: str, pr_number: int, commit_sha: str, decision: str, pr_data: Dict, feedback: str = None, merged: bool = False, duration: int = 0): """Record a review in the database""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() now = datetime.now().isoformat() # Record in reviewed_commits for fast lookup cursor.execute(""" INSERT OR REPLACE INTO reviewed_commits (org, repo, pr_number, commit_sha, reviewed_at, decision) VALUES (?, ?, ?, ?, ?, ?) """, (org, repo, pr_number, commit_sha, now, decision)) # Record full details in pr_reviews author = pr_data.get('author', {}).get('login') if isinstance(pr_data.get('author'), dict) else pr_data.get('author') cursor.execute(""" INSERT INTO pr_reviews (review_timestamp, org, repo, pr_number, pr_title, pr_author, pr_url, pr_created_at, pr_updated_at, commit_sha, decision, feedback, merged, merge_timestamp, run_duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( now, org, repo, pr_number, pr_data.get('title'), author, pr_data.get('url'), pr_data.get('createdAt'), pr_data.get('updatedAt'), commit_sha, decision, feedback, 1 if merged else 0, now if merged else None, duration )) conn.commit() conn.close() def update_repo_state(org: str, repo: str, found_prs: int, reviewed_prs: int): """Update repository state tracking""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() now = datetime.now().isoformat() cursor.execute(""" INSERT INTO repo_state (org, repo, last_checked_at, last_pr_found_at, total_prs_seen, total_prs_reviewed, active_score) VALUES (?, ?, ?, ?, ?, ?, 1.0) ON CONFLICT(org, repo) DO UPDATE SET last_checked_at = ?, last_pr_found_at = CASE WHEN ? > 0 THEN ? ELSE last_pr_found_at END, total_prs_seen = total_prs_seen + ?, total_prs_reviewed = total_prs_reviewed + ?, active_score = CASE WHEN ? > 0 THEN MIN(active_score * 1.1, 2.0) ELSE MAX(active_score * 0.9, 0.1) END """, (org, repo, now, now if found_prs > 0 else None, found_prs, reviewed_prs, now, found_prs, now, found_prs, reviewed_prs, found_prs)) conn.commit() conn.close() def fetch_all_review_prs() -> List[Dict]: """Fetch all PRs where current user is requested as reviewer using global search""" log("INFO", "Fetching PRs using global search (fast method)...", Colors.CYAN) # Use GitHub's search API via gh CLI - ONE API CALL instead of looping repos! cmd = [ "gh", "search", "prs", "--review-requested", "@me", "--state", "open", "--json", "number,title,repository,author,createdAt,updatedAt,url", "--limit", "100" ] output = run_command(cmd) if not output: log("WARNING", "No PRs found or search failed", Colors.YELLOW) return [] try: prs = json.loads(output) log("INFO", f"Found {len(prs)} PRs across all orgs", Colors.GREEN) return prs except json.JSONDecodeError as e: log_error(f"Failed to parse PR search results: {e}") return [] def sync_repo(org: str, repo: str, repo_dir: Path) -> bool: """Clone or update repository""" repo_path = repo_dir / repo try: if repo_path.exists(): log("INFO", f"Updating repo: {org}/{repo}", Colors.BLUE) os.chdir(repo_path) run_command(["git", "fetch", "--all", "--prune"]) else: log("INFO", f"Cloning repo: {org}/{repo}", Colors.BLUE) repo_dir.mkdir(parents=True, exist_ok=True) os.chdir(repo_dir) run_command(["gh", "repo", "clone", f"{org}/{repo}"]) os.chdir(repo) return True except Exception as e: log_error(f"Failed to sync repo {org}/{repo}: {e}") return False def get_pr_diff(pr_number: int, base_branch: str) -> Optional[str]: """Get PR diff""" try: # Fetch base branch run_command(["git", "fetch", "origin", base_branch], check=False) # Get diff diff = run_command(["git", "diff", f"origin/{base_branch}...HEAD"]) return diff except Exception as e: log_error(f"Failed to get diff for PR #{pr_number}: {e}") return None def review_with_claude(pr_data: Dict, diff: str, pr_info: Dict) -> Tuple[Optional[str], Optional[str]]: """Use Claude to review the PR""" # Parse org/repo from nameWithOwner repo_full = pr_data['repository']['nameWithOwner'] # Create review context context = f"""# PR Review Context ## Repository: {repo_full} ## PR #{pr_data['number']}: {pr_data['title']} ## Author: @{pr_data['author']['login']} ## URL: {pr_data['url']} ## Description {pr_info.get('body', 'No description provided')} ## Reviews {json.dumps(pr_info.get('reviews', []), indent=2)} ## Comments {json.dumps(pr_info.get('comments', []), indent=2)} ## Diff Summary {run_command(['git', 'diff', '--stat', f"origin/{pr_info.get('baseRefName', 'main')}...HEAD"]) or 'N/A'} ## Full Diff ```diff {diff} ``` --- Please review this PR thoroughly: 1. Check code quality, style, and best practices 2. Look for potential bugs or security issues 3. Verify if tests are included and appropriate 4. Review the conversation history for any concerns 5. Provide a decision: APPROVE or REQUEST_CHANGES 6. If requesting changes, provide specific, actionable feedback Format your response as: DECISION: [APPROVE|REQUEST_CHANGES] FEEDBACK: [Your detailed feedback here] """ # Write to temp file with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: f.write(context) temp_path = f.name try: log("INFO", "Running Claude review...", Colors.CYAN) result = run_command(["claude", "-p", context], check=False) if not result: return None, None # Parse decision decision_line = [line for line in result.split('\n') if 'DECISION:' in line.upper()] decision = None if decision_line: decision = decision_line[0].split(':', 1)[1].strip().replace('*', '').upper() # Parse feedback feedback_parts = result.split('FEEDBACK:', 1) feedback = feedback_parts[1].strip() if len(feedback_parts) > 1 else result return decision, feedback finally: Path(temp_path).unlink(missing_ok=True) def submit_review(org: str, repo: str, pr_number: int, decision: str, feedback: str) -> bool: """Submit review to GitHub""" try: if decision == "APPROVE": run_command(["gh", "pr", "review", str(pr_number), "--repo", f"{org}/{repo}", "--approve", "--body", feedback]) return True elif "REQUEST" in decision: run_command(["gh", "pr", "review", str(pr_number), "--repo", f"{org}/{repo}", "--request-changes", "--body", feedback]) return True return False except Exception as e: log_error(f"Failed to submit review: {e}") return False def merge_pr(org: str, repo: str, pr_number: int) -> bool: """Merge PR with squash""" try: log("INFO", f"Attempting to merge PR #{pr_number}...", Colors.CYAN) result = run_command([ "gh", "pr", "merge", str(pr_number), "--repo", f"{org}/{repo}", "--squash", "--delete-branch" ], check=False) if result is not None: log("SUCCESS", f"PR #{pr_number} merged successfully!", Colors.GREEN) return True else: log("WARNING", f"Failed to merge PR #{pr_number}", Colors.YELLOW) return False except Exception as e: log_error(f"Error merging PR #{pr_number}: {e}") return False def review_pr(pr_data: Dict, org_dir: Path) -> Dict: """Review a single PR""" # Use parsed org/repo from main() org = pr_data.get('_parsed_org') repo = pr_data.get('_parsed_repo') # Fallback if not parsed yet if not org or not repo: repo_full = pr_data['repository']['nameWithOwner'] org, repo = repo_full.split('/', 1) pr_number = pr_data['number'] start_time = time.time() result = { 'org': org, 'repo': repo, 'pr_number': pr_number, 'reviewed': False, 'decision': None, 'merged': False } log("INFO", "=" * 60, Colors.BLUE) log("INFO", f"Processing PR #{pr_number} in {org}/{repo}", Colors.BLUE) log("INFO", f"Title: {pr_data['title']}", Colors.BLUE) log("INFO", "=" * 60, Colors.BLUE) # Check staleness first (before fetching more data) is_stale, stale_reason = is_pr_stale(pr_data['createdAt'], pr_data['updatedAt']) if is_stale: log("WARNING", f"Skipping stale PR: {stale_reason}", Colors.YELLOW) # Use a placeholder commit_sha for skipped stale PRs record_review(org, repo, pr_number, "stale", "SKIP_STALE", pr_data, stale_reason, duration=int(time.time() - start_time)) return result # Sync repo if not sync_repo(org, repo, org_dir): log_error(f"Failed to sync repo {org}/{repo}") record_review(org, repo, pr_number, "error", "ERROR", pr_data, "Failed to sync repo", duration=int(time.time() - start_time)) return result repo_path = org_dir / repo os.chdir(repo_path) # Stash and checkout PR run_command(["git", "stash", "push", "-u", "-m", "Auto-stash"], check=False) try: run_command(["gh", "pr", "checkout", str(pr_number)]) except Exception as e: log_error(f"Failed to checkout PR #{pr_number}: {e}") record_review(org, repo, pr_number, "error", "ERROR", pr_data, f"Failed to checkout: {e}", duration=int(time.time() - start_time)) return result # Get PR details including commit SHA pr_info_json = run_command([ "gh", "pr", "view", str(pr_number), "--json", "title,body,author,baseRefName,reviews,comments,headRefOid" ]) if not pr_info_json: log_error("Failed to get PR details") return result pr_info = json.loads(pr_info_json) base_branch = pr_info.get('baseRefName', 'main') commit_sha = pr_info.get('headRefOid', 'unknown') # Now check if we've already reviewed this specific commit if already_reviewed(org, repo, pr_number, commit_sha): log("INFO", f"Already reviewed commit {commit_sha[:8]} - skipping", Colors.YELLOW) record_review(org, repo, pr_number, commit_sha, "SKIP_ALREADY_REVIEWED", pr_data, "Already reviewed this commit", duration=int(time.time() - start_time)) return result # Get diff diff = get_pr_diff(pr_number, base_branch) if not diff: log_error("Failed to get PR diff") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Failed to get diff", duration=int(time.time() - start_time)) return result # Review with Claude decision, feedback = review_with_claude(pr_data, diff, pr_info) if not decision: log_error("Failed to get decision from Claude") record_review(org, repo, pr_number, commit_sha, "ERROR", pr_data, "Claude review failed", duration=int(time.time() - start_time)) return result log("INFO", f"Claude's decision: {decision}", Colors.CYAN) result['decision'] = decision result['reviewed'] = True # Submit review if not submit_review(org, repo, pr_number, decision, feedback): log_error("Failed to submit review") return result # Merge if approved merged = False if decision == "APPROVE": merged = merge_pr(org, repo, pr_number) result['merged'] = merged # Record in database duration = int(time.time() - start_time) record_review(org, repo, pr_number, commit_sha, decision, pr_data, feedback, merged, duration) log("SUCCESS", f"Completed review of PR #{pr_number} in {duration}s", Colors.GREEN) # Cleanup run_command(["git", "checkout", base_branch], check=False) run_command(["git", "stash", "pop"], check=False) return result def main(): """Main execution""" log("INFO", "Starting Optimized PR Review Automation", Colors.CYAN) log("INFO", "=" * 60, Colors.CYAN) run_start = time.time() # Initialize if not check_prerequisites(): sys.exit(1) # Ensure BASE_DIR exists BASE_DIR.mkdir(parents=True, exist_ok=True) init_database() # Fetch all PRs using global search (FAST!) all_prs = fetch_all_review_prs() if not all_prs: log("INFO", "No PRs to review", Colors.GREEN) return # Group by org for processing prs_by_org = {} for pr in all_prs: # Parse org/repo from nameWithOwner field repo_full = pr['repository']['nameWithOwner'] org, repo_name = repo_full.split('/', 1) if org in ORGS: if org not in prs_by_org: prs_by_org[org] = [] # Add parsed org and repo to PR data pr['_parsed_org'] = org pr['_parsed_repo'] = repo_name prs_by_org[org].append(pr) # Stats stats = { 'total_found': len(all_prs), 'reviewed': 0, 'approved': 0, 'changes_requested': 0, 'merged': 0, 'skipped': 0, 'errors': 0 } # Process each org for org, prs in prs_by_org.items(): log("INFO", f"\nProcessing {len(prs)} PRs for {org}", Colors.BLUE) org_dir = ORGS[org] org_dir.mkdir(parents=True, exist_ok=True) reviewed_count = 0 found_count = len(prs) for pr in prs: result = review_pr(pr, org_dir) if result['reviewed']: stats['reviewed'] += 1 reviewed_count += 1 if result['decision'] == 'APPROVE': stats['approved'] += 1 elif 'REQUEST' in result['decision']: stats['changes_requested'] += 1 if result['merged']: stats['merged'] += 1 else: if result.get('decision') and 'SKIP' in result['decision']: stats['skipped'] += 1 else: stats['errors'] += 1 # Update repo state if prs: repo = prs[0]['repository']['name'] update_repo_state(org, repo, found_count, reviewed_count) # Record automation run duration = int(time.time() - run_start) conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(""" INSERT INTO automation_runs (run_timestamp, total_prs_found, prs_reviewed, prs_approved, prs_changes_requested, prs_merged, prs_skipped, errors, run_duration_seconds) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (datetime.now().isoformat(), stats['total_found'], stats['reviewed'], stats['approved'], stats['changes_requested'], stats['merged'], stats['skipped'], stats['errors'], duration)) conn.commit() conn.close() # Print summary log("INFO", "\n" + "=" * 60, Colors.CYAN) log("SUCCESS", "PR Review Automation Completed!", Colors.GREEN) log("INFO", "=" * 60, Colors.CYAN) log("INFO", f"Total PRs found: {stats['total_found']}", Colors.BLUE) log("INFO", f"Reviewed: {stats['reviewed']}", Colors.GREEN) log("INFO", f" - Approved: {stats['approved']}", Colors.GREEN) log("INFO", f" - Changes requested: {stats['changes_requested']}", Colors.YELLOW) log("INFO", f" - Merged: {stats['merged']}", Colors.GREEN) log("INFO", f"Skipped: {stats['skipped']}", Colors.YELLOW) log("INFO", f"Errors: {stats['errors']}", Colors.RED) log("INFO", f"Duration: {duration}s", Colors.BLUE) log("INFO", "=" * 60, Colors.CYAN) if __name__ == "__main__": try: main() except KeyboardInterrupt: log("WARNING", "\nInterrupted by user", Colors.YELLOW) sys.exit(130) except Exception as e: log_error(f"Fatal error: {e}") import traceback traceback.print_exc() sys.exit(1)