Last active
February 26, 2025 13:03
-
-
Save alonsoir/c660e33681257ff95cce6d499372609f to your computer and use it in GitHub Desktop.
Revisions
-
alonsoir revised this gist
Feb 26, 2025 . No changes.There are no files selected for viewing
-
alonsoir created this gist
Feb 26, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,25 @@ # Dockerfile FROM python:3.8-slim # Establecer un usuario no root RUN adduser --disabled-password appuser USER appuser # Configurar el directorio de trabajo WORKDIR /app # Copiar solo los archivos necesarios para la instalación de dependencias COPY requirements.txt . # Instalar dependencias con un cache limpio RUN pip install --no-cache-dir -r requirements.txt # Copiar el resto del código COPY --chown=appuser:appuser . . # Exponer el puerto de la aplicación (ajustar según la aplicación) EXPOSE 8000 # Establecer el comando de ejecución CMD ["python", "your_app.py"] This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,62 @@ # .github/workflows/ci-cd-security.yml name: CI/CD Security Workflow on: push: branches: - main jobs: security-check: runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: | pip install -r requirements.txt - name: Run Static Code Analysis run: | flake8 . bandit -r . - name: Run Dependency Check run: | safety check - name: Run Tests run: | pytest - name: Execute Secure CICD Automation run: | python secure_cicd.py # Ejecuta el script SecureCICD - name: Build Docker Image run: | docker build -t my-app:latest . - name: Run Security Scan on Docker Image run: | docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \ aquasec/trivy my-app:latest - name: Publish Image to Docker Registry if: success() && github.ref == 'refs/heads/main' run: | docker tag my-app:latest myregistry/my-app:latest docker push myregistry/my-app:latest - name: Notify Vulnerabilities if: failure() run: | echo "Vulnerabilities detected. Notifying team..." # Enviar notificación a Slack o a otro canal This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,11 @@ requests websocket-client colorlog PyGithub slack-sdk pytest flake8 bandit safety asyncio sqlite3 # No es necesario incluirlo, ya que es parte de la librería estándar de Python. This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,154 @@ import sqlite3 import json import requests import websocket import logging import time import asyncio import colorlog from github import Github from slack_sdk import WebClient class SecureCICD: def __init__(self, github_token, repo_name, slack_token, slack_channel, ollama_url): self.github = Github(github_token) self.repo = self.github.get_repo(repo_name) self.slack = WebClient(token=slack_token) self.slack_channel = slack_channel self.ollama_url = ollama_url self.ws = None self.db_init() self.setup_logging() def setup_logging(self): handler = colorlog.StreamHandler() handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s%(asctime)s - %(levelname)s - %(message)s', log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red' })) self.logger = colorlog.getLogger() self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def db_init(self): self.conn = sqlite3.connect("history.db", check_same_thread=False) self.cursor = self.conn.cursor() self.cursor.execute('''CREATE TABLE IF NOT EXISTS patches ( id INTEGER PRIMARY KEY AUTOINCREMENT, issue TEXT, patch TEXT, test TEXT, status TEXT )''') self.conn.commit() def get_patch_history(self, issue): self.cursor.execute("SELECT patch, test, status FROM patches WHERE issue = ?", (issue,)) return self.cursor.fetchall() def notify_slack(self, message): try: self.slack.chat_postMessage(channel=self.slack_channel, text=message) self.logger.info(f"Slack notification sent: {message}") except Exception as e: self.logger.error(f"Failed to send Slack notification: {str(e)}") async def connect_websocket(self): try: self.ws = websocket.create_connection(self.ollama_url) self.logger.info("WebSocket connection established with Ollama") except Exception as e: self.logger.error(f"Error connecting to WebSocket: {str(e)}") self.ws = None async def close_websocket(self): if self.ws: self.ws.close() self.logger.info("WebSocket connection closed") async def request_patch(self, issue): if not self.ws: await self.connect_websocket() if not self.ws: return None, None retries = 3 history = self.get_patch_history(issue) history_str = "\n".join([f"Patch: {h[0]}, Test: {h[1]}, Status: {h[2]}" for h in history]) for attempt in range(retries): try: prompt = { "prompt": f"Genera un parche y un test unitario para la siguiente vulnerabilidad: {issue}.\n\nHistorial de intentos previos:\n{history_str}\n\nAsegúrate de que el parche soluciona el problema y que el test verifica su corrección. La solución debe ser clara, mantenible y probada antes de enviarse." } self.ws.send(json.dumps(prompt)) response = json.loads(self.ws.recv()) patch, test = response.get("patch"), response.get("test") if patch and test: return patch, test self.logger.warning(f"Received incomplete response from Ollama, retrying ({attempt+1}/{retries})") await asyncio.sleep(2) except Exception as e: self.logger.error(f"Error communicating with Ollama: {str(e)}") return None, None def store_patch(self, issue, patch, test, status="pending"): try: self.cursor.execute("INSERT INTO patches (issue, patch, test, status) VALUES (?, ?, ?, ?)", (issue, patch, test, status)) self.conn.commit() self.logger.info(f"Patch stored for issue: {issue} with status {status}") except Exception as e: self.logger.error(f"Error storing patch in database: {str(e)}") def create_pull_request(self, patch, test, branch_name): try: main_branch = self.repo.get_branch("main") new_branch = self.repo.create_git_ref(ref=f'refs/heads/{branch_name}', sha=main_branch.commit.sha) patch_file = self.repo.get_contents("patch.py", ref=branch_name) test_file = self.repo.get_contents("test_patch.py", ref=branch_name) self.repo.update_file(patch_file.path, "Applying patch", patch, patch_file.sha, branch=branch_name) self.repo.update_file(test_file.path, "Adding test", test, test_file.sha, branch=branch_name) pr = self.repo.create_pull(title=f"Patch for {branch_name}", body="Automated patch and test submission", head=branch_name, base="main") self.logger.info(f"Pull request created: {pr.html_url}") return pr.html_url except Exception as e: self.logger.error(f"Error creating PR: {str(e)}") return None async def process_vulnerability(self, issue): patch, test = await self.request_patch(issue) if patch and test: self.store_patch(issue, patch, test, "generated") branch_name = f"patch-{issue.replace(' ', '-')}" pr_url = self.create_pull_request(patch, test, branch_name) if pr_url: self.store_patch(issue, patch, test, "PR_created") self.notify_slack(f"PR created: {pr_url}") else: self.notify_slack(f"Failed to create PR for {issue}") self.store_patch(issue, patch, test, "failed_PR_creation") else: self.notify_slack(f"Failed to generate patch for {issue}") self.store_patch(issue, None, None, "generation_failed") def close(self): self.conn.close() asyncio.run(self.close_websocket()) self.logger.info("Database and WebSocket connections closed") # Example usage if __name__ == "__main__": cicd = SecureCICD("github_token", "repo_name", "slack_token", "slack_channel", "ws://ollama.local") asyncio.run(cicd.process_vulnerability("Critical security issue in auth module")) cicd.close()