Skip to content

Instantly share code, notes, and snippets.

@alonsoir
Last active February 26, 2025 13:03
Show Gist options
  • Save alonsoir/c660e33681257ff95cce6d499372609f to your computer and use it in GitHub Desktop.
Save alonsoir/c660e33681257ff95cce6d499372609f to your computer and use it in GitHub Desktop.

Revisions

  1. alonsoir revised this gist Feb 26, 2025. No changes.
  2. alonsoir created this gist Feb 26, 2025.
    25 changes: 25 additions & 0 deletions Dockerfile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,25 @@
    # Dockerfile

    FROM python:3.8-slim

    # Establecer un usuario no root
    RUN adduser --disabled-password appuser
    USER appuser

    # Configurar el directorio de trabajo
    WORKDIR /app

    # Copiar solo los archivos necesarios para la instalación de dependencias
    COPY requirements.txt .

    # Instalar dependencias con un cache limpio
    RUN pip install --no-cache-dir -r requirements.txt

    # Copiar el resto del código
    COPY --chown=appuser:appuser . .

    # Exponer el puerto de la aplicación (ajustar según la aplicación)
    EXPOSE 8000

    # Establecer el comando de ejecución
    CMD ["python", "your_app.py"]
    62 changes: 62 additions & 0 deletions gh.action
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,62 @@
    # .github/workflows/ci-cd-security.yml

    name: CI/CD Security Workflow

    on:
    push:
    branches:
    - main

    jobs:
    security-check:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
    uses: actions/checkout@v3

    - name: Set up Python
    uses: actions/setup-python@v4
    with:
    python-version: '3.9'

    - name: Install dependencies
    run: |
    pip install -r requirements.txt

    - name: Run Static Code Analysis
    run: |
    flake8 .
    bandit -r .

    - name: Run Dependency Check
    run: |
    safety check

    - name: Run Tests
    run: |
    pytest

    - name: Execute Secure CICD Automation
    run: |
    python secure_cicd.py # Ejecuta el script SecureCICD

    - name: Build Docker Image
    run: |
    docker build -t my-app:latest .

    - name: Run Security Scan on Docker Image
    run: |
    docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
    aquasec/trivy my-app:latest

    - name: Publish Image to Docker Registry
    if: success() && github.ref == 'refs/heads/main'
    run: |
    docker tag my-app:latest myregistry/my-app:latest
    docker push myregistry/my-app:latest

    - name: Notify Vulnerabilities
    if: failure()
    run: |
    echo "Vulnerabilities detected. Notifying team..."
    # Enviar notificación a Slack o a otro canal
    11 changes: 11 additions & 0 deletions requirements.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    requests
    websocket-client
    colorlog
    PyGithub
    slack-sdk
    pytest
    flake8
    bandit
    safety
    asyncio
    sqlite3 # No es necesario incluirlo, ya que es parte de la librería estándar de Python.
    154 changes: 154 additions & 0 deletions secure_cicd.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,154 @@
    import sqlite3
    import json
    import requests
    import websocket
    import logging
    import time
    import asyncio
    import colorlog
    from github import Github
    from slack_sdk import WebClient

    class SecureCICD:
    def __init__(self, github_token, repo_name, slack_token, slack_channel, ollama_url):
    self.github = Github(github_token)
    self.repo = self.github.get_repo(repo_name)
    self.slack = WebClient(token=slack_token)
    self.slack_channel = slack_channel
    self.ollama_url = ollama_url
    self.ws = None
    self.db_init()
    self.setup_logging()

    def setup_logging(self):
    handler = colorlog.StreamHandler()
    handler.setFormatter(colorlog.ColoredFormatter('%(log_color)s%(asctime)s - %(levelname)s - %(message)s',
    log_colors={
    'DEBUG': 'cyan',
    'INFO': 'green',
    'WARNING': 'yellow',
    'ERROR': 'red',
    'CRITICAL': 'bold_red'
    }))
    self.logger = colorlog.getLogger()
    self.logger.addHandler(handler)
    self.logger.setLevel(logging.INFO)

    def db_init(self):
    self.conn = sqlite3.connect("history.db", check_same_thread=False)
    self.cursor = self.conn.cursor()
    self.cursor.execute('''CREATE TABLE IF NOT EXISTS patches (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    issue TEXT,
    patch TEXT,
    test TEXT,
    status TEXT
    )''')
    self.conn.commit()

    def get_patch_history(self, issue):
    self.cursor.execute("SELECT patch, test, status FROM patches WHERE issue = ?", (issue,))
    return self.cursor.fetchall()

    def notify_slack(self, message):
    try:
    self.slack.chat_postMessage(channel=self.slack_channel, text=message)
    self.logger.info(f"Slack notification sent: {message}")
    except Exception as e:
    self.logger.error(f"Failed to send Slack notification: {str(e)}")

    async def connect_websocket(self):
    try:
    self.ws = websocket.create_connection(self.ollama_url)
    self.logger.info("WebSocket connection established with Ollama")
    except Exception as e:
    self.logger.error(f"Error connecting to WebSocket: {str(e)}")
    self.ws = None

    async def close_websocket(self):
    if self.ws:
    self.ws.close()
    self.logger.info("WebSocket connection closed")

    async def request_patch(self, issue):
    if not self.ws:
    await self.connect_websocket()
    if not self.ws:
    return None, None

    retries = 3
    history = self.get_patch_history(issue)
    history_str = "\n".join([f"Patch: {h[0]}, Test: {h[1]}, Status: {h[2]}" for h in history])

    for attempt in range(retries):
    try:
    prompt = {
    "prompt": f"Genera un parche y un test unitario para la siguiente vulnerabilidad: {issue}.\n\nHistorial de intentos previos:\n{history_str}\n\nAsegúrate de que el parche soluciona el problema y que el test verifica su corrección. La solución debe ser clara, mantenible y probada antes de enviarse."
    }
    self.ws.send(json.dumps(prompt))
    response = json.loads(self.ws.recv())
    patch, test = response.get("patch"), response.get("test")
    if patch and test:
    return patch, test
    self.logger.warning(f"Received incomplete response from Ollama, retrying ({attempt+1}/{retries})")
    await asyncio.sleep(2)
    except Exception as e:
    self.logger.error(f"Error communicating with Ollama: {str(e)}")
    return None, None

    def store_patch(self, issue, patch, test, status="pending"):
    try:
    self.cursor.execute("INSERT INTO patches (issue, patch, test, status) VALUES (?, ?, ?, ?)",
    (issue, patch, test, status))
    self.conn.commit()
    self.logger.info(f"Patch stored for issue: {issue} with status {status}")
    except Exception as e:
    self.logger.error(f"Error storing patch in database: {str(e)}")

    def create_pull_request(self, patch, test, branch_name):
    try:
    main_branch = self.repo.get_branch("main")
    new_branch = self.repo.create_git_ref(ref=f'refs/heads/{branch_name}', sha=main_branch.commit.sha)

    patch_file = self.repo.get_contents("patch.py", ref=branch_name)
    test_file = self.repo.get_contents("test_patch.py", ref=branch_name)

    self.repo.update_file(patch_file.path, "Applying patch", patch, patch_file.sha, branch=branch_name)
    self.repo.update_file(test_file.path, "Adding test", test, test_file.sha, branch=branch_name)

    pr = self.repo.create_pull(title=f"Patch for {branch_name}",
    body="Automated patch and test submission",
    head=branch_name,
    base="main")
    self.logger.info(f"Pull request created: {pr.html_url}")
    return pr.html_url
    except Exception as e:
    self.logger.error(f"Error creating PR: {str(e)}")
    return None

    async def process_vulnerability(self, issue):
    patch, test = await self.request_patch(issue)
    if patch and test:
    self.store_patch(issue, patch, test, "generated")
    branch_name = f"patch-{issue.replace(' ', '-')}"
    pr_url = self.create_pull_request(patch, test, branch_name)
    if pr_url:
    self.store_patch(issue, patch, test, "PR_created")
    self.notify_slack(f"PR created: {pr_url}")
    else:
    self.notify_slack(f"Failed to create PR for {issue}")
    self.store_patch(issue, patch, test, "failed_PR_creation")
    else:
    self.notify_slack(f"Failed to generate patch for {issue}")
    self.store_patch(issue, None, None, "generation_failed")

    def close(self):
    self.conn.close()
    asyncio.run(self.close_websocket())
    self.logger.info("Database and WebSocket connections closed")

    # Example usage
    if __name__ == "__main__":
    cicd = SecureCICD("github_token", "repo_name", "slack_token", "slack_channel", "ws://ollama.local")
    asyncio.run(cicd.process_vulnerability("Critical security issue in auth module"))
    cicd.close()