#!/usr/bin/env python3 """ Gemini AI Integration MCP Server Provides development workflow automation with AI second opinions """ import asyncio import json import logging import os import sys from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional import mcp.server.stdio import mcp.types as types from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse, RedirectResponse, Response from mcp.server import InitializationOptions, NotificationOptions, Server from pydantic import BaseModel # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def check_container_environment(): """Check if running in a container""" if os.path.exists("/.dockerenv") or os.environ.get("CONTAINER_ENV"): return True return False def setup_logging(name: str): """Setup logging for the server""" logger = logging.getLogger(name) logger.setLevel(logging.INFO) # Create console handler with formatting ch = logging.StreamHandler() ch.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ch.setFormatter(formatter) # Add handler to logger logger.addHandler(ch) return logger class ToolRequest(BaseModel): """Model for tool execution requests""" tool: str arguments: Optional[Dict[str, Any]] = None parameters: Optional[Dict[str, Any]] = None client_id: Optional[str] = None def get_args(self) -> Dict[str, Any]: """Get arguments, supporting both 'arguments' and 'parameters' fields""" return self.arguments or self.parameters or {} class ToolResponse(BaseModel): """Model for tool execution responses""" success: bool result: Any error: Optional[str] = None class GeminiMCPServer: """MCP Server for Gemini AI integration and consultation""" def __init__(self, project_root: Optional[str] = None): # Check if running in container and exit if true if check_container_environment(): print( "ERROR: Gemini MCP Server cannot run inside a container!", file=sys.stderr, ) print( "The Gemini CLI requires Docker access and must run on the host system.", file=sys.stderr, ) print("Please launch this server directly on the host with:", file=sys.stderr) print(" python gemini_mcp_server.py", file=sys.stderr) sys.exit(1) # Initialize base server attributes self.name = "Gemini MCP Server" self.version = "1.0.0" self.port = 8006 # Standard Gemini MCP port self.logger = setup_logging("GeminiMCP") self.app = FastAPI(title=self.name, version=self.version) self._setup_routes() self._setup_events() self.project_root = Path(project_root) if project_root else Path.cwd() # Initialize Gemini integration self.gemini_config = self._load_gemini_config() self.gemini = self._initialize_gemini() # Track uncertainty for auto-consultation self.last_response_uncertainty = None def _setup_events(self): """Setup startup/shutdown events""" @self.app.on_event("startup") async def startup_event(): self.logger.info(f"{self.name} starting on port {self.port}") self.logger.info(f"Server version: {self.version}") self.logger.info("Server initialized successfully") def _setup_routes(self): """Setup common HTTP routes""" self.app.get("/health")(self.health_check) self.app.get("/mcp/tools")(self.list_tools) self.app.post("/mcp/execute")(self.execute_tool) self.app.post("/mcp/register")(self.register_client) self.app.post("/register")(self.register_client_oauth) self.app.post("/oauth/register")(self.register_client_oauth) self.app.get("/authorize")(self.oauth_authorize_bypass) self.app.post("/authorize")(self.oauth_authorize_bypass) self.app.get("/oauth/authorize")(self.oauth_authorize_bypass) self.app.post("/oauth/authorize")(self.oauth_authorize_bypass) self.app.post("/token")(self.oauth_token_bypass) self.app.post("/oauth/token")(self.oauth_token_bypass) self.app.get("/mcp/clients")(self.list_clients) self.app.get("/mcp/clients/{client_id}")(self.get_client_info) self.app.get("/mcp/stats")(self.get_stats) self.app.get("/.well-known/oauth-authorization-server")(self.oauth_discovery) self.app.get("/.well-known/oauth-authorization-server/mcp")(self.oauth_discovery) self.app.get("/.well-known/oauth-authorization-server/messages")(self.oauth_discovery) self.app.get("/.well-known/oauth-protected-resource")(self.oauth_protected_resource) self.app.get("/.well-known/mcp")(self.mcp_discovery) self.app.post("/mcp/initialize")(self.mcp_initialize) self.app.get("/mcp/capabilities")(self.mcp_capabilities) self.app.get("/messages")(self.handle_messages_get) self.app.post("/messages")(self.handle_messages) self.app.get("/mcp")(self.handle_mcp_get) self.app.post("/mcp")(self.handle_jsonrpc) self.app.options("/mcp")(self.handle_options) self.app.post("/mcp/rpc")(self.handle_jsonrpc) self.app.get("/mcp/sse")(self.handle_mcp_sse) async def health_check(self): """Health check endpoint""" return {"status": "healthy", "server": self.name, "version": self.version} async def register_client(self, request: Dict[str, Any]): """Register a client - simplified for home lab use""" client_name = request.get("client", request.get("client_name", "unknown")) client_id = request.get("client_id", f"{client_name}_simple") self.logger.info(f"Client registration request from: {client_name}") return { "status": "registered", "client": client_name, "client_id": client_id, "server": self.name, "version": self.version, "registration": { "client_id": client_id, "client_name": client_name, "registered": True, "is_update": False, "registration_time": datetime.utcnow().isoformat(), "server_time": datetime.utcnow().isoformat(), }, } async def register_client_oauth(self, request_data: Dict[str, Any], request: Request): """OAuth2-style client registration - simplified for home lab use""" redirect_uris = request_data.get("redirect_uris", []) client_name = request_data.get("client_name", request_data.get("client", "claude-code")) client_id = f"{client_name}_oauth" self.logger.info(f"OAuth registration request from: {client_name}") return { "client_id": client_id, "client_name": client_name, "redirect_uris": redirect_uris if redirect_uris else ["http://localhost"], "grant_types": request_data.get("grant_types", ["authorization_code"]), "response_types": request_data.get("response_types", ["code"]), "token_endpoint_auth_method": request_data.get("token_endpoint_auth_method", "none"), "registration_access_token": "not-required-for-local-mcp", "registration_client_uri": f"{request.url.scheme}://{request.url.netloc}/mcp/clients/{client_id}", "client_id_issued_at": int(datetime.utcnow().timestamp()), "client_secret_expires_at": 0, } async def oauth_authorize_bypass(self, request: Request): """Bypass OAuth2 authorization - immediately approve without auth""" params = dict(request.query_params) redirect_uri = params.get("redirect_uri", "http://localhost") state = params.get("state", "") auth_code = "bypass-auth-code-no-auth-required" separator = "&" if "?" in redirect_uri else "?" redirect_url = f"{redirect_uri}{separator}code={auth_code}" if state: redirect_url += f"&state={state}" return RedirectResponse(url=redirect_url, status_code=302) async def oauth_token_bypass(self, request: Request): """Bypass OAuth2 token exchange - immediately return access token""" try: if request.headers.get("content-type", "").startswith("application/json"): request_data = await request.json() else: form_data = await request.form() request_data = dict(form_data) except Exception: request_data = {} self.logger.info(f"Token request data: {request_data}") return { "access_token": "bypass-token-no-auth-required", "token_type": "Bearer", "expires_in": 31536000, "scope": "full_access", "refresh_token": "bypass-refresh-token-no-auth-required", } async def oauth_discovery(self, request: Request): """OAuth 2.0 authorization server metadata""" base_url = f"{request.url.scheme}://{request.url.netloc}" return { "issuer": base_url, "authorization_endpoint": f"{base_url}/authorize", "token_endpoint": f"{base_url}/token", "registration_endpoint": f"{base_url}/register", "token_endpoint_auth_methods_supported": ["none"], "response_types_supported": ["code"], "grant_types_supported": ["authorization_code"], "code_challenge_methods_supported": ["S256"], "registration_endpoint_auth_methods_supported": ["none"], } async def oauth_protected_resource(self, request: Request): """OAuth 2.0 protected resource metadata""" base_url = f"{request.url.scheme}://{request.url.netloc}" return { "resource": f"{base_url}/messages", "authorization_servers": [base_url], } async def handle_mcp_get(self, request: Request): """Handle GET requests to /mcp endpoint for SSE streaming""" import uuid from fastapi.responses import StreamingResponse session_id = request.headers.get("Mcp-Session-Id", str(uuid.uuid4())) async def event_generator(): connection_data = { "type": "connection", "sessionId": session_id, "status": "connected", } yield f"data: {json.dumps(connection_data)}\n\n" while True: await asyncio.sleep(15) ping_data = {"type": "ping", "timestamp": datetime.utcnow().isoformat()} yield f"data: {json.dumps(ping_data)}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Mcp-Session-Id": session_id, }, ) async def handle_mcp_sse(self, request: Request): """Handle SSE requests for authenticated clients""" from fastapi.responses import StreamingResponse auth_header = request.headers.get("authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException(status_code=401, detail="Unauthorized") async def event_generator(): yield f"data: {json.dumps({'type': 'connected', 'message': 'SSE connection established'})}\n\n" while True: await asyncio.sleep(30) yield f"data: {json.dumps({'type': 'ping'})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) async def handle_messages_get(self, request: Request): """Handle GET requests to /messages endpoint""" return { "protocol": "mcp", "version": "1.0", "server": { "name": self.name, "version": self.version, "description": f"{self.name} MCP Server", }, "auth": { "required": False, "type": "none", }, "transport": { "type": "streamable-http", "endpoint": "/messages", }, } async def handle_messages(self, request: Request): """Handle POST requests to /messages endpoint (HTTP Stream Transport)""" session_id = request.headers.get("Mcp-Session-Id") response_mode = request.headers.get("Mcp-Response-Mode", "batch").lower() protocol_version = request.headers.get("MCP-Protocol-Version") self.logger.info(f"Messages request headers: {dict(request.headers)}") self.logger.info(f"Session ID: {session_id}, Response Mode: {response_mode}, Protocol Version: {protocol_version}") try: body = await request.json() self.logger.info(f"Messages request body: {json.dumps(body)}") is_init_request = False if isinstance(body, dict) and body.get("method") == "initialize": is_init_request = True if not session_id: import uuid session_id = str(uuid.uuid4()) self.logger.info(f"Generated new session ID: {session_id}") if response_mode == "stream": from fastapi.responses import StreamingResponse async def event_generator(): if session_id: yield f"data: {json.dumps({'type': 'session', 'sessionId': session_id})}\n\n" if isinstance(body, list): for req in body: response = await self._process_jsonrpc_request(req) if response: yield f"data: {json.dumps(response)}\n\n" else: response = await self._process_jsonrpc_request(body) if response: yield f"data: {json.dumps(response)}\n\n" yield f"data: {json.dumps({'type': 'completion'})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", "Mcp-Session-Id": session_id or "", }, ) else: if isinstance(body, list): responses = [] has_notifications = False for req in body: response = await self._process_jsonrpc_request(req) if response is None: has_notifications = True else: responses.append(response) if not responses and has_notifications: return Response( status_code=202, headers={ "Mcp-Session-Id": session_id or "", }, ) return JSONResponse( content=responses, headers={ "Content-Type": "application/json", "Mcp-Session-Id": session_id or "", }, ) else: response = await self._process_jsonrpc_request(body) if response is None: return Response( status_code=202, headers={ "Mcp-Session-Id": session_id or "", }, ) else: if is_init_request and session_id: self.logger.info(f"Returning session ID in response: {session_id}") return JSONResponse( content=response, headers={ "Content-Type": "application/json", "Mcp-Session-Id": session_id or "", }, ) except Exception as e: self.logger.error(f"Messages endpoint error: {e}") return JSONResponse( content={ "jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error", "data": str(e)}, "id": None, }, status_code=400, headers={ "Content-Type": "application/json", "Mcp-Session-Id": session_id or "", }, ) async def handle_jsonrpc(self, request: Request): """Handle JSON-RPC 2.0 requests for MCP protocol""" return await self.handle_messages(request) async def handle_options(self, request: Request): """Handle OPTIONS requests for CORS preflight""" return Response( content="", headers={ "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET, POST, OPTIONS", "Access-Control-Allow-Headers": "Content-Type, Authorization, Mcp-Session-Id, Mcp-Response-Mode", "Access-Control-Max-Age": "86400", }, ) async def _process_jsonrpc_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Process a single JSON-RPC request""" jsonrpc = request.get("jsonrpc", "2.0") method = request.get("method") params = request.get("params", {}) req_id = request.get("id") self.logger.info(f"JSON-RPC request: method={method}, id={req_id}") is_notification = req_id is None try: if method == "initialize": result = await self._jsonrpc_initialize(params) elif method == "initialized": self.logger.info("Client sent initialized notification") if is_notification: return None result = {"status": "acknowledged"} elif method == "tools/list": result = await self._jsonrpc_list_tools(params) elif method == "tools/call": result = await self._jsonrpc_call_tool(params) elif method == "completion/complete": result = {"error": "Completions not supported"} elif method == "ping": result = {"pong": True} else: if not is_notification: return { "jsonrpc": jsonrpc, "error": { "code": -32601, "message": f"Method not found: {method}", }, "id": req_id, } return None if not is_notification: response = {"jsonrpc": jsonrpc, "result": result, "id": req_id} self.logger.info(f"JSON-RPC response: {json.dumps(response)}") if method == "initialize" and "protocolVersion" in result: self.logger.info("Initialization complete, ready for tools/list request") self.logger.info("Expecting client to send 'tools/list' request next") return response return None except Exception as e: self.logger.error(f"Error processing method {method}: {e}") if not is_notification: return { "jsonrpc": jsonrpc, "error": { "code": -32603, "message": "Internal error", "data": str(e), }, "id": req_id, } return None async def _jsonrpc_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle initialize request""" client_info = params.get("clientInfo", {}) protocol_version = params.get("protocolVersion", "2024-11-05") self.logger.info(f"Client info: {client_info}, requested protocol: {protocol_version}") self._protocol_version = protocol_version return { "protocolVersion": protocol_version, "serverInfo": {"name": self.name, "version": self.version}, "capabilities": { "tools": {"listChanged": True}, "resources": {}, "prompts": {}, }, } async def _jsonrpc_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tools/list request""" tools = self.get_tools() self.logger.info(f"Available tools from get_tools(): {list(tools.keys())}") tool_list = [] for tool_name, tool_info in tools.items(): tool_list.append( { "name": tool_name, "description": tool_info.get("description", ""), "inputSchema": tool_info.get("parameters", {}), } ) self.logger.info(f"Returning {len(tool_list)} tools to client") return {"tools": tool_list} async def _jsonrpc_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]: """Handle tools/call request""" tool_name = params.get("name") arguments = params.get("arguments", {}) if not tool_name: raise ValueError("Tool name is required") tools = self.get_tools() if tool_name not in tools: raise ValueError(f"Tool '{tool_name}' not found") tool_func = getattr(self, tool_name, None) if not tool_func: raise ValueError(f"Tool '{tool_name}' not implemented") try: result = await tool_func(**arguments) if isinstance(result, dict): content_text = json.dumps(result, indent=2) else: content_text = str(result) return {"content": [{"type": "text", "text": content_text}]} except Exception as e: self.logger.error(f"Error calling tool {tool_name}: {e}") return { "content": [{"type": "text", "text": f"Error executing {tool_name}: {str(e)}"}], "isError": True, } async def mcp_discovery(self): """MCP protocol discovery endpoint""" return { "mcp_version": "1.0", "server_name": self.name, "server_version": self.version, "capabilities": { "tools": True, "prompts": False, "resources": False, }, "endpoints": { "tools": "/mcp/tools", "execute": "/mcp/execute", "initialize": "/mcp/initialize", "capabilities": "/mcp/capabilities", }, } async def mcp_info(self): """MCP server information""" return { "protocol": "mcp", "version": "1.0", "server": { "name": self.name, "version": self.version, "description": f"{self.name} MCP Server", }, "auth": { "required": False, "type": "none", }, } async def mcp_initialize(self, request: Dict[str, Any]): """Initialize MCP session""" client_info = request.get("client", {}) return { "session_id": f"session-{client_info.get('name', 'unknown')}-{int(datetime.utcnow().timestamp())}", "server": { "name": self.name, "version": self.version, }, "capabilities": { "tools": True, "prompts": False, "resources": False, }, } async def mcp_capabilities(self): """Return server capabilities""" tools = self.get_tools() return { "capabilities": { "tools": { "list": list(tools.keys()), "count": len(tools), }, "prompts": { "supported": False, }, "resources": { "supported": False, }, }, } async def list_tools(self): """List available tools""" tools = self.get_tools() return { "tools": [ { "name": tool_name, "description": tool_info.get("description", ""), "parameters": tool_info.get("parameters", {}), } for tool_name, tool_info in tools.items() ] } async def execute_tool(self, request: ToolRequest): """Execute a tool with given arguments""" try: tools = self.get_tools() if request.tool not in tools: raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found") tool_func = getattr(self, request.tool, None) if not tool_func: raise HTTPException(status_code=501, detail=f"Tool '{request.tool}' not implemented") result = await tool_func(**request.get_args()) return ToolResponse(success=True, result=result) except Exception as e: self.logger.error(f"Error executing tool {request.tool}: {str(e)}") return ToolResponse(success=False, result=None, error=str(e)) async def list_clients(self, active_only: bool = True): """List clients - returns empty for home lab use""" return {"clients": [], "count": 0, "active_only": active_only} async def get_client_info(self, client_id: str): """Get client info - returns simple response for home lab use""" return { "client_id": client_id, "client_name": client_id.replace("_oauth", "").replace("_simple", ""), "active": True, "registered_at": datetime.utcnow().isoformat(), } async def get_stats(self): """Get server statistics - simplified for home lab use""" return { "server": { "name": self.name, "version": self.version, "tools_count": len(self.get_tools()), }, "clients": { "total_clients": 0, "active_clients": 0, "inactive_clients": 0, "clients_active_last_hour": 0, "total_requests": 0, }, } def _load_gemini_config(self) -> Dict[str, Any]: """Load Gemini configuration from environment or config file""" # Try to load .env file if it exists env_file = self.project_root / ".env" if env_file.exists(): try: with open(env_file, "r") as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, value = line.split("=", 1) # Only set if not already in environment if key not in os.environ: os.environ[key] = value except Exception as e: self.logger.warning(f"Could not load .env file: {e}") config = { "enabled": os.getenv("GEMINI_ENABLED", "true").lower() == "true", "auto_consult": os.getenv("GEMINI_AUTO_CONSULT", "true").lower() == "true", "cli_command": os.getenv("GEMINI_CLI_COMMAND", "gemini"), "timeout": int(os.getenv("GEMINI_TIMEOUT", "60")), "rate_limit_delay": float(os.getenv("GEMINI_RATE_LIMIT", "2")), "max_context_length": int(os.getenv("GEMINI_MAX_CONTEXT", "4000")), "log_consultations": os.getenv("GEMINI_LOG_CONSULTATIONS", "true").lower() == "true", "model": os.getenv("GEMINI_MODEL", "gemini-2.5-flash"), "sandbox_mode": os.getenv("GEMINI_SANDBOX", "false").lower() == "true", "debug_mode": os.getenv("GEMINI_DEBUG", "false").lower() == "true", "include_history": os.getenv("GEMINI_INCLUDE_HISTORY", "true").lower() == "true", "max_history_entries": int(os.getenv("GEMINI_MAX_HISTORY", "10")), } # Try to load from config file config_file = self.project_root / "gemini-config.json" if config_file.exists(): try: with open(config_file, "r") as f: file_config = json.load(f) config.update(file_config) except Exception as e: self.logger.warning(f"Could not load gemini-config.json: {e}") return config def _initialize_gemini(self): """Initialize Gemini integration with lazy loading""" try: from gemini_integration import get_integration return get_integration(self.gemini_config) except ImportError as e: self.logger.error(f"Failed to import Gemini integration: {e}") # Return a mock object that always returns disabled status class MockGemini: def __init__(self): self.auto_consult = False self.enabled = False async def consult_gemini(self, **kwargs): return { "status": "disabled", "error": "Gemini integration not available", } def clear_conversation_history(self): return {"message": "Gemini integration not available"} def get_statistics(self): return {} return MockGemini() def get_tools(self) -> Dict[str, Dict[str, Any]]: """Return available Gemini tools""" return { "consult_gemini": { "description": "Consult Gemini AI for a second opinion or validation", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The question or code to consult Gemini about", }, "context": { "type": "string", "description": "Additional context for the consultation", }, "comparison_mode": { "type": "boolean", "default": True, "description": "Compare with previous Claude response", }, "force": { "type": "boolean", "default": False, "description": "Force consultation even if disabled", }, }, "required": ["query"], }, }, "clear_gemini_history": { "description": "Clear Gemini conversation history", "parameters": {"type": "object", "properties": {}}, }, "gemini_status": { "description": "Get Gemini integration status and statistics", "parameters": {"type": "object", "properties": {}}, }, "toggle_gemini_auto_consult": { "description": "Toggle automatic Gemini consultation on uncertainty detection", "parameters": { "type": "object", "properties": { "enable": { "type": "boolean", "description": "Enable or disable auto-consultation", } }, }, }, } async def consult_gemini( self, query: str, context: str = "", comparison_mode: bool = True, force: bool = False, ) -> Dict[str, Any]: """Consult Gemini AI for a second opinion Args: query: The question or code to consult about context: Additional context comparison_mode: Compare with previous Claude response force: Force consultation even if disabled Returns: Dictionary with consultation results """ if not query: return { "success": False, "error": "'query' parameter is required for Gemini consultation", } # Consult Gemini result = await self.gemini.consult_gemini( query=query, context=context, comparison_mode=comparison_mode, force_consult=force, ) # Format the response formatted_response = self._format_gemini_response(result) return { "success": result.get("status") == "success", "result": formatted_response, "raw_result": result, } async def clear_gemini_history(self) -> Dict[str, Any]: """Clear Gemini conversation history""" result = self.gemini.clear_conversation_history() return {"success": True, "message": result.get("message", "History cleared")} async def gemini_status(self) -> Dict[str, Any]: """Get Gemini integration status and statistics""" stats = self.gemini.get_statistics() if hasattr(self.gemini, "get_statistics") else {} status_info = { "enabled": getattr(self.gemini, "enabled", False), "auto_consult": getattr(self.gemini, "auto_consult", False), "model": self.gemini_config.get("model", "unknown"), "timeout": self.gemini_config.get("timeout", 60), "statistics": stats, } return {"success": True, "status": status_info} async def toggle_gemini_auto_consult(self, enable: Optional[bool] = None) -> Dict[str, Any]: """Toggle automatic Gemini consultation Args: enable: True to enable, False to disable, None to toggle Returns: Dictionary with new status """ if enable is None: # Toggle current state self.gemini.auto_consult = not getattr(self.gemini, "auto_consult", False) else: self.gemini.auto_consult = bool(enable) status = "enabled" if self.gemini.auto_consult else "disabled" return { "success": True, "status": status, "message": f"Gemini auto-consultation is now {status}", } def _format_gemini_response(self, result: Dict[str, Any]) -> str: """Format Gemini consultation response""" output_lines = [] output_lines.append("🤖 Gemini Consultation Response") output_lines.append("=" * 40) output_lines.append("") if result["status"] == "success": output_lines.append(f"✅ Consultation ID: {result.get('consultation_id', 'N/A')}") output_lines.append(f"âąī¸ Execution time: {result.get('execution_time', 0):.2f}s") output_lines.append("") # Display the raw response response = result.get("response", "") if response: output_lines.append("📄 Response:") output_lines.append(response) elif result["status"] == "disabled": output_lines.append("â„šī¸ Gemini consultation is currently disabled") output_lines.append("💡 Enable with: toggle_gemini_auto_consult") elif result["status"] == "timeout": output_lines.append(f"❌ {result.get('error', 'Timeout error')}") output_lines.append("💡 Try increasing the timeout or simplifying the query") else: # error output_lines.append(f"❌ Error: {result.get('error', 'Unknown error')}") output_lines.append("") output_lines.append("💡 Troubleshooting:") output_lines.append(" 1. Check if Gemini CLI is installed and in PATH") output_lines.append(" 2. Verify Gemini CLI authentication") output_lines.append(" 3. Check the logs for more details") return "\n".join(output_lines) async def run_stdio(self): """Run the server in stdio mode (for Claude desktop app)""" server = Server(self.name) # Store tools and their functions for later access self._tools = self.get_tools() self._tool_funcs = {} for tool_name, tool_info in self._tools.items(): tool_func = getattr(self, tool_name, None) if tool_func: self._tool_funcs[tool_name] = tool_func @server.list_tools() async def list_tools() -> List[types.Tool]: """List available tools""" tools = [] for tool_name, tool_info in self._tools.items(): tools.append( types.Tool( name=tool_name, description=tool_info.get("description", ""), inputSchema=tool_info.get("parameters", {}), ) ) return tools @server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """Call a tool with given arguments""" if name not in self._tool_funcs: return [types.TextContent(type="text", text=f"Tool '{name}' not found")] try: # Call the tool function result = await self._tool_funcs[name](**arguments) # Convert result to MCP response format if isinstance(result, dict): return [types.TextContent(type="text", text=json.dumps(result, indent=2))] return [types.TextContent(type="text", text=str(result))] except Exception as e: self.logger.error(f"Error calling tool {name}: {str(e)}") return [types.TextContent(type="text", text=f"Error: {str(e)}")] # Run the stdio server async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name=self.name, server_version=self.version, capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) def run_http(self): """Run the server in HTTP mode""" import uvicorn uvicorn.run(self.app, host="0.0.0.0", port=self.port) def run(self, mode: str = "http"): """Run the server in specified mode""" if mode == "stdio": asyncio.run(self.run_stdio()) elif mode == "http": self.run_http() else: raise ValueError(f"Unknown mode: {mode}. Use 'stdio' or 'http'.") def main(): """Run the Gemini MCP Server""" import argparse parser = argparse.ArgumentParser(description="Gemini AI Integration MCP Server") parser.add_argument( "--mode", choices=["http", "stdio"], default="stdio", # Default to stdio for Gemini help="Server mode (http or stdio)", ) parser.add_argument("--project-root", default=None, help="Project root directory") args = parser.parse_args() server = GeminiMCPServer(project_root=args.project_root) server.run(mode=args.mode) if __name__ == "__main__": main()