import sqlite3 import json import requests import websocket import logging import time import asyncio import colorlog from github import Github from slack_sdk import WebClient class SecureCICD: def __init__(self, github_token, repo_name, slack_token, slack_channel, ollama_url): self.github = Github(github_token) self.repo = self.github.get_repo(repo_name) self.slack = WebClient(token=slack_token) self.slack_channel = slack_channel self.ollama_url = ollama_url self.ws = None self.db_init() self.setup_logging() def setup_logging(self): handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s%(asctime)s - %(levelname)s - %(message)s', log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red' })) self.logger = colorlog.getLogger() self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def db_init(self): self.conn = sqlite3.connect("history.db", check_same_thread=False) self.cursor = self.conn.cursor() self.cursor.execute('''CREATE TABLE IF NOT EXISTS patches ( id INTEGER PRIMARY KEY AUTOINCREMENT, issue TEXT, patch TEXT, test TEXT, status TEXT )''') self.conn.commit() def get_patch_history(self, issue): self.cursor.execute("SELECT patch, test, status FROM patches WHERE issue = ?", (issue,)) return self.cursor.fetchall() def notify_slack(self, message): try: self.slack.chat_postMessage(channel=self.slack_channel, text=message) self.logger.info(f"Slack notification sent: {message}") except Exception as e: self.logger.error(f"Failed to send Slack notification: {str(e)}") async def connect_websocket(self): try: self.ws = websocket.create_connection(self.ollama_url) self.logger.info("WebSocket connection established with Ollama") except Exception as e: self.logger.error(f"Error connecting to WebSocket: {str(e)}") self.ws = None async def close_websocket(self): if self.ws: self.ws.close() self.logger.info("WebSocket connection closed") async def request_patch(self, issue): if not self.ws: await self.connect_websocket() if not self.ws: return None, None retries = 3 history = self.get_patch_history(issue) history_str = "\n".join([f"Patch: {h[0]}, Test: {h[1]}, Status: {h[2]}" for h in history]) for attempt in range(retries): try: prompt = { "prompt": f"Genera un parche y un test unitario para la siguiente vulnerabilidad: {issue}.\n\nHistorial de intentos previos:\n{history_str}\n\nAsegúrate de que el parche soluciona el problema y que el test verifica su corrección. La solución debe ser clara, mantenible y probada antes de enviarse." } self.ws.send(json.dumps(prompt)) response = json.loads(self.ws.recv()) patch, test = response.get("patch"), response.get("test") if patch and test: return patch, test self.logger.warning(f"Received incomplete response from Ollama, retrying ({attempt+1}/{retries})") await asyncio.sleep(2) except Exception as e: self.logger.error(f"Error communicating with Ollama: {str(e)}") return None, None def store_patch(self, issue, patch, test, status="pending"): try: self.cursor.execute("INSERT INTO patches (issue, patch, test, status) VALUES (?, ?, ?, ?)", (issue, patch, test, status)) self.conn.commit() self.logger.info(f"Patch stored for issue: {issue} with status {status}") except Exception as e: self.logger.error(f"Error storing patch in database: {str(e)}") def create_pull_request(self, patch, test, branch_name): try: main_branch = self.repo.get_branch("main") new_branch = self.repo.create_git_ref(ref=f'refs/heads/{branch_name}', sha=main_branch.commit.sha) patch_file = self.repo.get_contents("patch.py", ref=branch_name) test_file = self.repo.get_contents("test_patch.py", ref=branch_name) self.repo.update_file(patch_file.path, "Applying patch", patch, patch_file.sha, branch=branch_name) self.repo.update_file(test_file.path, "Adding test", test, test_file.sha, branch=branch_name) pr = self.repo.create_pull(title=f"Patch for {branch_name}", body="Automated patch and test submission", head=branch_name, base="main") self.logger.info(f"Pull request created: {pr.html_url}") return pr.html_url except Exception as e: self.logger.error(f"Error creating PR: {str(e)}") return None async def process_vulnerability(self, issue): patch, test = await self.request_patch(issue) if patch and test: self.store_patch(issue, patch, test, "generated") branch_name = f"patch-{issue.replace(' ', '-')}" pr_url = self.create_pull_request(patch, test, branch_name) if pr_url: self.store_patch(issue, patch, test, "PR_created") self.notify_slack(f"PR created: {pr_url}") else: self.notify_slack(f"Failed to create PR for {issue}") self.store_patch(issue, patch, test, "failed_PR_creation") else: self.notify_slack(f"Failed to generate patch for {issue}") self.store_patch(issue, None, None, "generation_failed") def close(self): self.conn.close() asyncio.run(self.close_websocket()) self.logger.info("Database and WebSocket connections closed") # Example usage if __name__ == "__main__": cicd = SecureCICD("github_token", "repo_name", "slack_token", "slack_channel", "ws://ollama.local") asyncio.run(cicd.process_vulnerability("Critical security issue in auth module")) cicd.close()