Skip to content

Instantly share code, notes, and snippets.

@relentlessmiles
Forked from AndrewAltimit/!README.md
Created August 21, 2025 07:22
Show Gist options
  • Save relentlessmiles/29ac3f6c34a6c5e51a2a43986a07925b to your computer and use it in GitHub Desktop.
Save relentlessmiles/29ac3f6c34a6c5e51a2a43986a07925b to your computer and use it in GitHub Desktop.
Claude Code and Gemini CLI Integration

Gemini AI Integration MCP Server

A Model Context Protocol (MCP) server that integrates Google's Gemini AI for code review, technical consultation, and AI-assisted development workflows. This server provides seamless integration with Claude Code and other MCP-compatible clients.

Usage

See the template repository for a complete example, including Gemini CLI automated PR reviews : Example PR , Script.

mcp-demo

Features

  • AI Consultation: Get second opinions on code and technical decisions
  • Conversation History: Maintain context across consultations
  • Auto-consultation: Automatic AI consultation on uncertainty detection
  • Comparison Mode: Compare responses with previous Claude outputs
  • Rate Limiting: Built-in rate limiting to avoid API quota issues
  • Dual Mode Support: Runs in both stdio (for Claude Desktop) and HTTP modes

Important Requirements

⚠️ This server MUST run on the host system, not in a container!

The Gemini CLI requires Docker access to function properly, which means it cannot run inside a container itself (would require Docker-in-Docker). Always launch this server directly on your host machine.

Prerequisites

  1. Python 3.8+: Ensure Python is installed
  2. Gemini CLI: Install the Gemini CLI tool and ensure it's in your PATH
  3. Authentication: Configure Gemini CLI authentication with gemini auth
  4. Dependencies: Install required Python packages:
    pip install mcp fastapi uvicorn pydantic

Installation

  1. Clone this repository or download the files
  2. Install dependencies:
    pip install -r requirements.txt
  3. Configure environment variables (see Configuration section)

Running the Server

stdio Mode (Recommended)

# Run in stdio mode
python gemini_mcp_server.py --mode stdio

# With custom project root
python gemini_mcp_server.py --mode stdio --project-root /path/to/project

HTTP Mode

# Run in HTTP mode on port 8006
python gemini_mcp_server.py --mode http

# The server will be available at:
# http://localhost:8006

Available Tools

consult_gemini

Get AI assistance from Gemini for code review, problem-solving, or validation.

Parameters:

  • query (required): The question or code to consult Gemini about
  • context: Additional context for the consultation
  • comparison_mode: Compare with previous Claude response (default: true)
  • force: Force consultation even if disabled (default: false)

Example:

{
  "tool": "consult_gemini",
  "arguments": {
    "query": "Review this function for potential issues",
    "context": "def factorial(n): return 1 if n <= 1 else n * factorial(n-1)",
    "comparison_mode": true
  }
}

gemini_status

Get current status and statistics of the Gemini integration.

Example:

{
  "tool": "gemini_status",
  "arguments": {}
}

clear_gemini_history

Clear the conversation history to start fresh consultations.

Example:

{
  "tool": "clear_gemini_history",
  "arguments": {}
}

toggle_gemini_auto_consult

Enable or disable automatic Gemini consultation when uncertainty is detected.

Parameters:

  • enable: true to enable, false to disable, omit to toggle

Example:

{
  "tool": "toggle_gemini_auto_consult",
  "arguments": {"enable": true}
}

Configuration

Environment Variables

Create a .env file in your project root or set these environment variables:

# Enable/disable Gemini integration
GEMINI_ENABLED=true

# Auto-consultation on uncertainty
GEMINI_AUTO_CONSULT=false

# Gemini CLI command (if not in PATH)
GEMINI_CLI_COMMAND=gemini

# Request timeout in seconds
GEMINI_TIMEOUT=300

# Rate limit delay between requests
GEMINI_RATE_LIMIT=2

# Maximum context length
GEMINI_MAX_CONTEXT=4000

# Log consultations
GEMINI_LOG_CONSULTATIONS=true

# Gemini model to use
GEMINI_MODEL=gemini-2.5-pro

# Sandbox mode for testing
GEMINI_SANDBOX=false

# Debug mode
GEMINI_DEBUG=false

# Include conversation history
GEMINI_INCLUDE_HISTORY=true

# Maximum history entries
GEMINI_MAX_HISTORY=10

Configuration File

Create gemini-config.json in your project root:

{
  "enabled": true,
  "auto_consult": true,
  "timeout": 60,
  "model": "gemini-2.5-flash",
  "max_context_length": 4000,
  "rate_limit_delay": 2
}

Integration with Claude Desktop

Add to your Claude Desktop configuration (~/Library/Application Support/Claude/claude_desktop_config.json on macOS):

{
  "mcpServers": {
    "gemini": {
      "command": "python",
      "args": [
        "/path/to/gemini_mcp_server.py",
        "--mode", "stdio",
        "--project-root", "/path/to/your/project"
      ]
    }
  }
}

Testing

Test stdio Mode

python test_gemini_mcp.py --mode stdio

Test HTTP Mode

# Start the server in HTTP mode first
python gemini_mcp_server.py --mode http

# In another terminal, run tests
python test_gemini_mcp.py --mode http

Test State Management

python test_gemini_state.py

HTTP API Endpoints

When running in HTTP mode, the following endpoints are available:

  • GET /health - Health check
  • GET /mcp/tools - List available tools
  • POST /mcp/execute - Execute a tool
  • GET /mcp/stats - Server statistics
  • POST /messages - MCP protocol messages

Uncertainty Detection

The server automatically detects uncertainty in responses and can trigger automatic Gemini consultation. Detected patterns include:

  • Uncertainty phrases: "I'm not sure", "I think", "possibly", "probably"
  • Complex decisions: "multiple approaches", "trade-offs", "alternatives"
  • Critical operations: "production", "security", "authentication"

Example Workflow

# 1. Check status
status = await client.execute_tool("gemini_status")

# 2. Clear history for fresh start
await client.execute_tool("clear_gemini_history")

# 3. Consult about code
result = await client.execute_tool("consult_gemini", {
    "query": "Review this Python function for best practices",
    "context": "def process_data(data): return [x*2 for x in data if x > 0]"
})

# 4. Disable auto-consult if needed
await client.execute_tool("toggle_gemini_auto_consult", {"enable": False})

Troubleshooting

"Cannot run in container" Error

If you see this error, you're trying to run the server inside Docker. Exit the container and run on your host system.

Gemini CLI Not Found

  1. Install the Gemini CLI tool
  2. Add it to your PATH
  3. Or set GEMINI_CLI_COMMAND to the full path

Authentication Issues

  1. Run gemini auth to configure authentication
  2. Ensure your credentials are valid, avoid API key if you want to stick to free tier.
  3. Check the Gemini CLI documentation

Timeout Errors

  1. Increase GEMINI_TIMEOUT for complex queries
  2. Simplify your queries
  3. Check network connectivity

Security Considerations

  • API keys are managed by the Gemini CLI
  • No credentials are stored in the MCP server
  • Consultation logs can be disabled for sensitive code
  • Sandbox mode available for testing without API calls

Best Practices

  1. Clear History Regularly: Clear conversation history when switching contexts
  2. Provide Context: Include relevant context for better AI responses
  3. Rate Limiting: Respect rate limits to avoid API quota issues
  4. Error Handling: Always handle potential timeout or API errors
  5. Comparison Mode: Use comparison mode to get diverse perspectives
# Gemini MCP Server Configuration
# Copy this file to .env and update with your settings
# Enable/disable Gemini integration
GEMINI_ENABLED=true
# Auto-consultation on uncertainty detection
GEMINI_AUTO_CONSULT=false
# Gemini CLI command (if not in PATH)
GEMINI_CLI_COMMAND=gemini
# Request timeout in seconds
GEMINI_TIMEOUT=300
# Rate limit delay between requests (seconds)
GEMINI_RATE_LIMIT=2
# Maximum context length (characters)
GEMINI_MAX_CONTEXT=4000
# Log consultations
GEMINI_LOG_CONSULTATIONS=true
# Gemini model to use
GEMINI_MODEL=gemini-2.5-pro
# Sandbox mode for testing (no actual API calls)
GEMINI_SANDBOX=false
# Debug mode (verbose logging)
GEMINI_DEBUG=false
# Include conversation history in consultations
GEMINI_INCLUDE_HISTORY=true
# Maximum number of history entries to keep
GEMINI_MAX_HISTORY=10
{
"enabled": true,
"auto_consult": false,
"cli_command": "gemini",
"timeout": 300,
"rate_limit_delay": 2.0,
"max_context_length": 4000,
"log_consultations": true,
"model": "gemini-2.5-pro",
"sandbox_mode": false,
"debug_mode": false,
"include_history": true,
"max_history_entries": 10
}
#!/usr/bin/env python3
"""
Gemini CLI Integration Module
Provides automatic consultation with Gemini for second opinions and validation
"""
import asyncio
import logging
import re
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Uncertainty patterns that trigger automatic Gemini consultation
UNCERTAINTY_PATTERNS = [
r"\bI'm not sure\b",
r"\bI think\b",
r"\bpossibly\b",
r"\bprobably\b",
r"\bmight be\b",
r"\bcould be\b",
r"\bI believe\b",
r"\bIt seems\b",
r"\bappears to be\b",
r"\buncertain\b",
r"\bI would guess\b",
r"\blikely\b",
r"\bperhaps\b",
r"\bmaybe\b",
r"\bI assume\b",
]
# Complex decision patterns that benefit from second opinions
COMPLEX_DECISION_PATTERNS = [
r"\bmultiple approaches\b",
r"\bseveral options\b",
r"\btrade-offs?\b",
r"\bconsider(?:ing)?\b",
r"\balternatives?\b",
r"\bpros and cons\b",
r"\bweigh(?:ing)? the options\b",
r"\bchoice between\b",
r"\bdecision\b",
]
# Critical operations that should trigger consultation
CRITICAL_OPERATION_PATTERNS = [
r"\bproduction\b",
r"\bdatabase migration\b",
r"\bsecurity\b",
r"\bauthentication\b",
r"\bencryption\b",
r"\bAPI key\b",
r"\bcredentials?\b",
r"\bperformance\s+critical\b",
]
class GeminiIntegration:
"""Handles Gemini CLI integration for second opinions and validation"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.enabled = self.config.get("enabled", True)
self.auto_consult = self.config.get("auto_consult", True)
self.cli_command = self.config.get("cli_command", "gemini")
self.timeout = self.config.get("timeout", 60)
self.rate_limit_delay = self.config.get("rate_limit_delay", 2.0)
self.last_consultation = 0
self.consultation_log: List[Dict[str, Any]] = []
self.max_context_length = self.config.get("max_context_length", 4000)
self.model = self.config.get("model", "gemini-2.5-flash")
# Conversation history for maintaining state
self.conversation_history: List[Tuple[str, str]] = []
self.max_history_entries = self.config.get("max_history_entries", 10)
self.include_history = self.config.get("include_history", True)
async def consult_gemini(
self,
query: str,
context: str = "",
comparison_mode: bool = True,
force_consult: bool = False,
) -> Dict[str, Any]:
"""Consult Gemini CLI for second opinion"""
if not self.enabled and not force_consult:
return {"status": "disabled", "message": "Gemini integration is disabled"}
if not force_consult:
await self._enforce_rate_limit()
consultation_id = f"consult_{int(time.time())}_{len(self.consultation_log)}"
try:
# Prepare query with context
full_query = self._prepare_query(query, context, comparison_mode)
# Execute Gemini CLI command
result = await self._execute_gemini_cli(full_query)
# Save to conversation history
if self.include_history and result.get("output"):
self.conversation_history.append((query, result["output"]))
# Trim history if it exceeds max entries
if len(self.conversation_history) > self.max_history_entries:
self.conversation_history = self.conversation_history[-self.max_history_entries :]
# Log consultation
if self.config.get("log_consultations", True):
self.consultation_log.append(
{
"id": consultation_id,
"timestamp": datetime.now().isoformat(),
"query": (query[:200] + "..." if len(query) > 200 else query),
"status": "success",
"execution_time": result.get("execution_time", 0),
}
)
return {
"status": "success",
"response": result["output"],
"execution_time": result["execution_time"],
"consultation_id": consultation_id,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Error consulting Gemini: {str(e)}")
return {
"status": "error",
"error": str(e),
"consultation_id": consultation_id,
}
def detect_uncertainty(self, text: str) -> Tuple[bool, List[str]]:
"""Detect if text contains uncertainty patterns"""
found_patterns = []
# Check uncertainty patterns
for pattern in UNCERTAINTY_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
found_patterns.append(f"uncertainty: {pattern}")
# Check complex decision patterns
for pattern in COMPLEX_DECISION_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
found_patterns.append(f"complex_decision: {pattern}")
# Check critical operation patterns
for pattern in CRITICAL_OPERATION_PATTERNS:
if re.search(pattern, text, re.IGNORECASE):
found_patterns.append(f"critical_operation: {pattern}")
return len(found_patterns) > 0, found_patterns
def clear_conversation_history(self) -> Dict[str, Any]:
"""Clear the conversation history"""
old_count = len(self.conversation_history)
self.conversation_history = []
return {
"status": "success",
"cleared_entries": old_count,
"message": f"Cleared {old_count} conversation entries",
}
def get_consultation_stats(self) -> Dict[str, Any]:
"""Get statistics about consultations"""
if not self.consultation_log:
return {"total_consultations": 0}
completed = [e for e in self.consultation_log if e.get("status") == "success"]
return {
"total_consultations": len(self.consultation_log),
"completed_consultations": len(completed),
"average_execution_time": (
sum(e.get("execution_time", 0) for e in completed) / len(completed) if completed else 0
),
"conversation_history_size": len(self.conversation_history),
}
async def _enforce_rate_limit(self):
"""Enforce rate limiting between consultations"""
current_time = time.time()
time_since_last = current_time - self.last_consultation
if time_since_last < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - time_since_last
await asyncio.sleep(sleep_time)
self.last_consultation = time.time()
def _prepare_query(self, query: str, context: str, comparison_mode: bool) -> str:
"""Prepare the full query for Gemini CLI"""
parts = []
if comparison_mode:
parts.append("Please provide a technical analysis and second opinion:")
parts.append("")
# Include conversation history if enabled and available
if self.include_history and self.conversation_history:
parts.append("Previous conversation:")
parts.append("-" * 40)
for i, (prev_q, prev_a) in enumerate(self.conversation_history[-self.max_history_entries :], 1):
parts.append(f"Q{i}: {prev_q}")
# Truncate long responses in history
if len(prev_a) > 500:
parts.append(f"A{i}: {prev_a[:500]}... [truncated]")
else:
parts.append(f"A{i}: {prev_a}")
parts.append("")
parts.append("-" * 40)
parts.append("")
# Truncate context if too long
if len(context) > self.max_context_length:
context = context[: self.max_context_length] + "\n[Context truncated...]"
if context:
parts.append("Context:")
parts.append(context)
parts.append("")
parts.append("Current Question/Topic:")
parts.append(query)
if comparison_mode:
parts.extend(
[
"",
"Please structure your response with:",
"1. Your analysis and understanding",
"2. Recommendations or approach",
"3. Any concerns or considerations",
"4. Alternative approaches (if applicable)",
]
)
return "\n".join(parts)
async def _execute_gemini_cli(self, query: str) -> Dict[str, Any]:
"""Execute Gemini CLI command and return results"""
start_time = time.time()
# Build command
cmd = [self.cli_command]
if self.model:
cmd.extend(["-m", self.model])
cmd.extend(["-p", query]) # Non-interactive mode
try:
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.timeout)
execution_time = time.time() - start_time
if process.returncode != 0:
error_msg = stderr.decode() if stderr else "Unknown error"
if "authentication" in error_msg.lower():
error_msg += "\nTip: Run 'gemini' interactively to authenticate"
raise Exception(f"Gemini CLI failed: {error_msg}")
return {"output": stdout.decode().strip(), "execution_time": execution_time}
except asyncio.TimeoutError:
raise Exception(f"Gemini CLI timed out after {self.timeout} seconds")
# Singleton pattern implementation
_integration = None
def get_integration(config: Optional[Dict[str, Any]] = None) -> GeminiIntegration:
"""
Get or create the global Gemini integration instance.
This ensures that all parts of the application share the same instance,
maintaining consistent state for rate limiting, consultation history,
and configuration across all tool calls.
Args:
config: Optional configuration dict. Only used on first call.
Returns:
The singleton GeminiIntegration instance
"""
global _integration
if _integration is None:
_integration = GeminiIntegration(config)
return _integration
#!/usr/bin/env python3
"""
Gemini AI Integration MCP Server
Provides development workflow automation with AI second opinions
"""
import asyncio
import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
import mcp.server.stdio
import mcp.types as types
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, RedirectResponse, Response
from mcp.server import InitializationOptions, NotificationOptions, Server
from pydantic import BaseModel
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def check_container_environment():
"""Check if running in a container"""
if os.path.exists("/.dockerenv") or os.environ.get("CONTAINER_ENV"):
return True
return False
def setup_logging(name: str):
"""Setup logging for the server"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# Create console handler with formatting
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
# Add handler to logger
logger.addHandler(ch)
return logger
class ToolRequest(BaseModel):
"""Model for tool execution requests"""
tool: str
arguments: Optional[Dict[str, Any]] = None
parameters: Optional[Dict[str, Any]] = None
client_id: Optional[str] = None
def get_args(self) -> Dict[str, Any]:
"""Get arguments, supporting both 'arguments' and 'parameters' fields"""
return self.arguments or self.parameters or {}
class ToolResponse(BaseModel):
"""Model for tool execution responses"""
success: bool
result: Any
error: Optional[str] = None
class GeminiMCPServer:
"""MCP Server for Gemini AI integration and consultation"""
def __init__(self, project_root: Optional[str] = None):
# Check if running in container and exit if true
if check_container_environment():
print(
"ERROR: Gemini MCP Server cannot run inside a container!",
file=sys.stderr,
)
print(
"The Gemini CLI requires Docker access and must run on the host system.",
file=sys.stderr,
)
print("Please launch this server directly on the host with:", file=sys.stderr)
print(" python gemini_mcp_server.py", file=sys.stderr)
sys.exit(1)
# Initialize base server attributes
self.name = "Gemini MCP Server"
self.version = "1.0.0"
self.port = 8006 # Standard Gemini MCP port
self.logger = setup_logging("GeminiMCP")
self.app = FastAPI(title=self.name, version=self.version)
self._setup_routes()
self._setup_events()
self.project_root = Path(project_root) if project_root else Path.cwd()
# Initialize Gemini integration
self.gemini_config = self._load_gemini_config()
self.gemini = self._initialize_gemini()
# Track uncertainty for auto-consultation
self.last_response_uncertainty = None
def _setup_events(self):
"""Setup startup/shutdown events"""
@self.app.on_event("startup")
async def startup_event():
self.logger.info(f"{self.name} starting on port {self.port}")
self.logger.info(f"Server version: {self.version}")
self.logger.info("Server initialized successfully")
def _setup_routes(self):
"""Setup common HTTP routes"""
self.app.get("/health")(self.health_check)
self.app.get("/mcp/tools")(self.list_tools)
self.app.post("/mcp/execute")(self.execute_tool)
self.app.post("/mcp/register")(self.register_client)
self.app.post("/register")(self.register_client_oauth)
self.app.post("/oauth/register")(self.register_client_oauth)
self.app.get("/authorize")(self.oauth_authorize_bypass)
self.app.post("/authorize")(self.oauth_authorize_bypass)
self.app.get("/oauth/authorize")(self.oauth_authorize_bypass)
self.app.post("/oauth/authorize")(self.oauth_authorize_bypass)
self.app.post("/token")(self.oauth_token_bypass)
self.app.post("/oauth/token")(self.oauth_token_bypass)
self.app.get("/mcp/clients")(self.list_clients)
self.app.get("/mcp/clients/{client_id}")(self.get_client_info)
self.app.get("/mcp/stats")(self.get_stats)
self.app.get("/.well-known/oauth-authorization-server")(self.oauth_discovery)
self.app.get("/.well-known/oauth-authorization-server/mcp")(self.oauth_discovery)
self.app.get("/.well-known/oauth-authorization-server/messages")(self.oauth_discovery)
self.app.get("/.well-known/oauth-protected-resource")(self.oauth_protected_resource)
self.app.get("/.well-known/mcp")(self.mcp_discovery)
self.app.post("/mcp/initialize")(self.mcp_initialize)
self.app.get("/mcp/capabilities")(self.mcp_capabilities)
self.app.get("/messages")(self.handle_messages_get)
self.app.post("/messages")(self.handle_messages)
self.app.get("/mcp")(self.handle_mcp_get)
self.app.post("/mcp")(self.handle_jsonrpc)
self.app.options("/mcp")(self.handle_options)
self.app.post("/mcp/rpc")(self.handle_jsonrpc)
self.app.get("/mcp/sse")(self.handle_mcp_sse)
async def health_check(self):
"""Health check endpoint"""
return {"status": "healthy", "server": self.name, "version": self.version}
async def register_client(self, request: Dict[str, Any]):
"""Register a client - simplified for home lab use"""
client_name = request.get("client", request.get("client_name", "unknown"))
client_id = request.get("client_id", f"{client_name}_simple")
self.logger.info(f"Client registration request from: {client_name}")
return {
"status": "registered",
"client": client_name,
"client_id": client_id,
"server": self.name,
"version": self.version,
"registration": {
"client_id": client_id,
"client_name": client_name,
"registered": True,
"is_update": False,
"registration_time": datetime.utcnow().isoformat(),
"server_time": datetime.utcnow().isoformat(),
},
}
async def register_client_oauth(self, request_data: Dict[str, Any], request: Request):
"""OAuth2-style client registration - simplified for home lab use"""
redirect_uris = request_data.get("redirect_uris", [])
client_name = request_data.get("client_name", request_data.get("client", "claude-code"))
client_id = f"{client_name}_oauth"
self.logger.info(f"OAuth registration request from: {client_name}")
return {
"client_id": client_id,
"client_name": client_name,
"redirect_uris": redirect_uris if redirect_uris else ["http://localhost"],
"grant_types": request_data.get("grant_types", ["authorization_code"]),
"response_types": request_data.get("response_types", ["code"]),
"token_endpoint_auth_method": request_data.get("token_endpoint_auth_method", "none"),
"registration_access_token": "not-required-for-local-mcp",
"registration_client_uri": f"{request.url.scheme}://{request.url.netloc}/mcp/clients/{client_id}",
"client_id_issued_at": int(datetime.utcnow().timestamp()),
"client_secret_expires_at": 0,
}
async def oauth_authorize_bypass(self, request: Request):
"""Bypass OAuth2 authorization - immediately approve without auth"""
params = dict(request.query_params)
redirect_uri = params.get("redirect_uri", "http://localhost")
state = params.get("state", "")
auth_code = "bypass-auth-code-no-auth-required"
separator = "&" if "?" in redirect_uri else "?"
redirect_url = f"{redirect_uri}{separator}code={auth_code}"
if state:
redirect_url += f"&state={state}"
return RedirectResponse(url=redirect_url, status_code=302)
async def oauth_token_bypass(self, request: Request):
"""Bypass OAuth2 token exchange - immediately return access token"""
try:
if request.headers.get("content-type", "").startswith("application/json"):
request_data = await request.json()
else:
form_data = await request.form()
request_data = dict(form_data)
except Exception:
request_data = {}
self.logger.info(f"Token request data: {request_data}")
return {
"access_token": "bypass-token-no-auth-required",
"token_type": "Bearer",
"expires_in": 31536000,
"scope": "full_access",
"refresh_token": "bypass-refresh-token-no-auth-required",
}
async def oauth_discovery(self, request: Request):
"""OAuth 2.0 authorization server metadata"""
base_url = f"{request.url.scheme}://{request.url.netloc}"
return {
"issuer": base_url,
"authorization_endpoint": f"{base_url}/authorize",
"token_endpoint": f"{base_url}/token",
"registration_endpoint": f"{base_url}/register",
"token_endpoint_auth_methods_supported": ["none"],
"response_types_supported": ["code"],
"grant_types_supported": ["authorization_code"],
"code_challenge_methods_supported": ["S256"],
"registration_endpoint_auth_methods_supported": ["none"],
}
async def oauth_protected_resource(self, request: Request):
"""OAuth 2.0 protected resource metadata"""
base_url = f"{request.url.scheme}://{request.url.netloc}"
return {
"resource": f"{base_url}/messages",
"authorization_servers": [base_url],
}
async def handle_mcp_get(self, request: Request):
"""Handle GET requests to /mcp endpoint for SSE streaming"""
import uuid
from fastapi.responses import StreamingResponse
session_id = request.headers.get("Mcp-Session-Id", str(uuid.uuid4()))
async def event_generator():
connection_data = {
"type": "connection",
"sessionId": session_id,
"status": "connected",
}
yield f"data: {json.dumps(connection_data)}\n\n"
while True:
await asyncio.sleep(15)
ping_data = {"type": "ping", "timestamp": datetime.utcnow().isoformat()}
yield f"data: {json.dumps(ping_data)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Mcp-Session-Id": session_id,
},
)
async def handle_mcp_sse(self, request: Request):
"""Handle SSE requests for authenticated clients"""
from fastapi.responses import StreamingResponse
auth_header = request.headers.get("authorization", "")
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Unauthorized")
async def event_generator():
yield f"data: {json.dumps({'type': 'connected', 'message': 'SSE connection established'})}\n\n"
while True:
await asyncio.sleep(30)
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
async def handle_messages_get(self, request: Request):
"""Handle GET requests to /messages endpoint"""
return {
"protocol": "mcp",
"version": "1.0",
"server": {
"name": self.name,
"version": self.version,
"description": f"{self.name} MCP Server",
},
"auth": {
"required": False,
"type": "none",
},
"transport": {
"type": "streamable-http",
"endpoint": "/messages",
},
}
async def handle_messages(self, request: Request):
"""Handle POST requests to /messages endpoint (HTTP Stream Transport)"""
session_id = request.headers.get("Mcp-Session-Id")
response_mode = request.headers.get("Mcp-Response-Mode", "batch").lower()
protocol_version = request.headers.get("MCP-Protocol-Version")
self.logger.info(f"Messages request headers: {dict(request.headers)}")
self.logger.info(f"Session ID: {session_id}, Response Mode: {response_mode}, Protocol Version: {protocol_version}")
try:
body = await request.json()
self.logger.info(f"Messages request body: {json.dumps(body)}")
is_init_request = False
if isinstance(body, dict) and body.get("method") == "initialize":
is_init_request = True
if not session_id:
import uuid
session_id = str(uuid.uuid4())
self.logger.info(f"Generated new session ID: {session_id}")
if response_mode == "stream":
from fastapi.responses import StreamingResponse
async def event_generator():
if session_id:
yield f"data: {json.dumps({'type': 'session', 'sessionId': session_id})}\n\n"
if isinstance(body, list):
for req in body:
response = await self._process_jsonrpc_request(req)
if response:
yield f"data: {json.dumps(response)}\n\n"
else:
response = await self._process_jsonrpc_request(body)
if response:
yield f"data: {json.dumps(response)}\n\n"
yield f"data: {json.dumps({'type': 'completion'})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Mcp-Session-Id": session_id or "",
},
)
else:
if isinstance(body, list):
responses = []
has_notifications = False
for req in body:
response = await self._process_jsonrpc_request(req)
if response is None:
has_notifications = True
else:
responses.append(response)
if not responses and has_notifications:
return Response(
status_code=202,
headers={
"Mcp-Session-Id": session_id or "",
},
)
return JSONResponse(
content=responses,
headers={
"Content-Type": "application/json",
"Mcp-Session-Id": session_id or "",
},
)
else:
response = await self._process_jsonrpc_request(body)
if response is None:
return Response(
status_code=202,
headers={
"Mcp-Session-Id": session_id or "",
},
)
else:
if is_init_request and session_id:
self.logger.info(f"Returning session ID in response: {session_id}")
return JSONResponse(
content=response,
headers={
"Content-Type": "application/json",
"Mcp-Session-Id": session_id or "",
},
)
except Exception as e:
self.logger.error(f"Messages endpoint error: {e}")
return JSONResponse(
content={
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error", "data": str(e)},
"id": None,
},
status_code=400,
headers={
"Content-Type": "application/json",
"Mcp-Session-Id": session_id or "",
},
)
async def handle_jsonrpc(self, request: Request):
"""Handle JSON-RPC 2.0 requests for MCP protocol"""
return await self.handle_messages(request)
async def handle_options(self, request: Request):
"""Handle OPTIONS requests for CORS preflight"""
return Response(
content="",
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization, Mcp-Session-Id, Mcp-Response-Mode",
"Access-Control-Max-Age": "86400",
},
)
async def _process_jsonrpc_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Process a single JSON-RPC request"""
jsonrpc = request.get("jsonrpc", "2.0")
method = request.get("method")
params = request.get("params", {})
req_id = request.get("id")
self.logger.info(f"JSON-RPC request: method={method}, id={req_id}")
is_notification = req_id is None
try:
if method == "initialize":
result = await self._jsonrpc_initialize(params)
elif method == "initialized":
self.logger.info("Client sent initialized notification")
if is_notification:
return None
result = {"status": "acknowledged"}
elif method == "tools/list":
result = await self._jsonrpc_list_tools(params)
elif method == "tools/call":
result = await self._jsonrpc_call_tool(params)
elif method == "completion/complete":
result = {"error": "Completions not supported"}
elif method == "ping":
result = {"pong": True}
else:
if not is_notification:
return {
"jsonrpc": jsonrpc,
"error": {
"code": -32601,
"message": f"Method not found: {method}",
},
"id": req_id,
}
return None
if not is_notification:
response = {"jsonrpc": jsonrpc, "result": result, "id": req_id}
self.logger.info(f"JSON-RPC response: {json.dumps(response)}")
if method == "initialize" and "protocolVersion" in result:
self.logger.info("Initialization complete, ready for tools/list request")
self.logger.info("Expecting client to send 'tools/list' request next")
return response
return None
except Exception as e:
self.logger.error(f"Error processing method {method}: {e}")
if not is_notification:
return {
"jsonrpc": jsonrpc,
"error": {
"code": -32603,
"message": "Internal error",
"data": str(e),
},
"id": req_id,
}
return None
async def _jsonrpc_initialize(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle initialize request"""
client_info = params.get("clientInfo", {})
protocol_version = params.get("protocolVersion", "2024-11-05")
self.logger.info(f"Client info: {client_info}, requested protocol: {protocol_version}")
self._protocol_version = protocol_version
return {
"protocolVersion": protocol_version,
"serverInfo": {"name": self.name, "version": self.version},
"capabilities": {
"tools": {"listChanged": True},
"resources": {},
"prompts": {},
},
}
async def _jsonrpc_list_tools(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tools/list request"""
tools = self.get_tools()
self.logger.info(f"Available tools from get_tools(): {list(tools.keys())}")
tool_list = []
for tool_name, tool_info in tools.items():
tool_list.append(
{
"name": tool_name,
"description": tool_info.get("description", ""),
"inputSchema": tool_info.get("parameters", {}),
}
)
self.logger.info(f"Returning {len(tool_list)} tools to client")
return {"tools": tool_list}
async def _jsonrpc_call_tool(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Handle tools/call request"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if not tool_name:
raise ValueError("Tool name is required")
tools = self.get_tools()
if tool_name not in tools:
raise ValueError(f"Tool '{tool_name}' not found")
tool_func = getattr(self, tool_name, None)
if not tool_func:
raise ValueError(f"Tool '{tool_name}' not implemented")
try:
result = await tool_func(**arguments)
if isinstance(result, dict):
content_text = json.dumps(result, indent=2)
else:
content_text = str(result)
return {"content": [{"type": "text", "text": content_text}]}
except Exception as e:
self.logger.error(f"Error calling tool {tool_name}: {e}")
return {
"content": [{"type": "text", "text": f"Error executing {tool_name}: {str(e)}"}],
"isError": True,
}
async def mcp_discovery(self):
"""MCP protocol discovery endpoint"""
return {
"mcp_version": "1.0",
"server_name": self.name,
"server_version": self.version,
"capabilities": {
"tools": True,
"prompts": False,
"resources": False,
},
"endpoints": {
"tools": "/mcp/tools",
"execute": "/mcp/execute",
"initialize": "/mcp/initialize",
"capabilities": "/mcp/capabilities",
},
}
async def mcp_info(self):
"""MCP server information"""
return {
"protocol": "mcp",
"version": "1.0",
"server": {
"name": self.name,
"version": self.version,
"description": f"{self.name} MCP Server",
},
"auth": {
"required": False,
"type": "none",
},
}
async def mcp_initialize(self, request: Dict[str, Any]):
"""Initialize MCP session"""
client_info = request.get("client", {})
return {
"session_id": f"session-{client_info.get('name', 'unknown')}-{int(datetime.utcnow().timestamp())}",
"server": {
"name": self.name,
"version": self.version,
},
"capabilities": {
"tools": True,
"prompts": False,
"resources": False,
},
}
async def mcp_capabilities(self):
"""Return server capabilities"""
tools = self.get_tools()
return {
"capabilities": {
"tools": {
"list": list(tools.keys()),
"count": len(tools),
},
"prompts": {
"supported": False,
},
"resources": {
"supported": False,
},
},
}
async def list_tools(self):
"""List available tools"""
tools = self.get_tools()
return {
"tools": [
{
"name": tool_name,
"description": tool_info.get("description", ""),
"parameters": tool_info.get("parameters", {}),
}
for tool_name, tool_info in tools.items()
]
}
async def execute_tool(self, request: ToolRequest):
"""Execute a tool with given arguments"""
try:
tools = self.get_tools()
if request.tool not in tools:
raise HTTPException(status_code=404, detail=f"Tool '{request.tool}' not found")
tool_func = getattr(self, request.tool, None)
if not tool_func:
raise HTTPException(status_code=501, detail=f"Tool '{request.tool}' not implemented")
result = await tool_func(**request.get_args())
return ToolResponse(success=True, result=result)
except Exception as e:
self.logger.error(f"Error executing tool {request.tool}: {str(e)}")
return ToolResponse(success=False, result=None, error=str(e))
async def list_clients(self, active_only: bool = True):
"""List clients - returns empty for home lab use"""
return {"clients": [], "count": 0, "active_only": active_only}
async def get_client_info(self, client_id: str):
"""Get client info - returns simple response for home lab use"""
return {
"client_id": client_id,
"client_name": client_id.replace("_oauth", "").replace("_simple", ""),
"active": True,
"registered_at": datetime.utcnow().isoformat(),
}
async def get_stats(self):
"""Get server statistics - simplified for home lab use"""
return {
"server": {
"name": self.name,
"version": self.version,
"tools_count": len(self.get_tools()),
},
"clients": {
"total_clients": 0,
"active_clients": 0,
"inactive_clients": 0,
"clients_active_last_hour": 0,
"total_requests": 0,
},
}
def _load_gemini_config(self) -> Dict[str, Any]:
"""Load Gemini configuration from environment or config file"""
# Try to load .env file if it exists
env_file = self.project_root / ".env"
if env_file.exists():
try:
with open(env_file, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
# Only set if not already in environment
if key not in os.environ:
os.environ[key] = value
except Exception as e:
self.logger.warning(f"Could not load .env file: {e}")
config = {
"enabled": os.getenv("GEMINI_ENABLED", "true").lower() == "true",
"auto_consult": os.getenv("GEMINI_AUTO_CONSULT", "true").lower() == "true",
"cli_command": os.getenv("GEMINI_CLI_COMMAND", "gemini"),
"timeout": int(os.getenv("GEMINI_TIMEOUT", "60")),
"rate_limit_delay": float(os.getenv("GEMINI_RATE_LIMIT", "2")),
"max_context_length": int(os.getenv("GEMINI_MAX_CONTEXT", "4000")),
"log_consultations": os.getenv("GEMINI_LOG_CONSULTATIONS", "true").lower() == "true",
"model": os.getenv("GEMINI_MODEL", "gemini-2.5-flash"),
"sandbox_mode": os.getenv("GEMINI_SANDBOX", "false").lower() == "true",
"debug_mode": os.getenv("GEMINI_DEBUG", "false").lower() == "true",
"include_history": os.getenv("GEMINI_INCLUDE_HISTORY", "true").lower() == "true",
"max_history_entries": int(os.getenv("GEMINI_MAX_HISTORY", "10")),
}
# Try to load from config file
config_file = self.project_root / "gemini-config.json"
if config_file.exists():
try:
with open(config_file, "r") as f:
file_config = json.load(f)
config.update(file_config)
except Exception as e:
self.logger.warning(f"Could not load gemini-config.json: {e}")
return config
def _initialize_gemini(self):
"""Initialize Gemini integration with lazy loading"""
try:
from gemini_integration import get_integration
return get_integration(self.gemini_config)
except ImportError as e:
self.logger.error(f"Failed to import Gemini integration: {e}")
# Return a mock object that always returns disabled status
class MockGemini:
def __init__(self):
self.auto_consult = False
self.enabled = False
async def consult_gemini(self, **kwargs):
return {
"status": "disabled",
"error": "Gemini integration not available",
}
def clear_conversation_history(self):
return {"message": "Gemini integration not available"}
def get_statistics(self):
return {}
return MockGemini()
def get_tools(self) -> Dict[str, Dict[str, Any]]:
"""Return available Gemini tools"""
return {
"consult_gemini": {
"description": "Consult Gemini AI for a second opinion or validation",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The question or code to consult Gemini about",
},
"context": {
"type": "string",
"description": "Additional context for the consultation",
},
"comparison_mode": {
"type": "boolean",
"default": True,
"description": "Compare with previous Claude response",
},
"force": {
"type": "boolean",
"default": False,
"description": "Force consultation even if disabled",
},
},
"required": ["query"],
},
},
"clear_gemini_history": {
"description": "Clear Gemini conversation history",
"parameters": {"type": "object", "properties": {}},
},
"gemini_status": {
"description": "Get Gemini integration status and statistics",
"parameters": {"type": "object", "properties": {}},
},
"toggle_gemini_auto_consult": {
"description": "Toggle automatic Gemini consultation on uncertainty detection",
"parameters": {
"type": "object",
"properties": {
"enable": {
"type": "boolean",
"description": "Enable or disable auto-consultation",
}
},
},
},
}
async def consult_gemini(
self,
query: str,
context: str = "",
comparison_mode: bool = True,
force: bool = False,
) -> Dict[str, Any]:
"""Consult Gemini AI for a second opinion
Args:
query: The question or code to consult about
context: Additional context
comparison_mode: Compare with previous Claude response
force: Force consultation even if disabled
Returns:
Dictionary with consultation results
"""
if not query:
return {
"success": False,
"error": "'query' parameter is required for Gemini consultation",
}
# Consult Gemini
result = await self.gemini.consult_gemini(
query=query,
context=context,
comparison_mode=comparison_mode,
force_consult=force,
)
# Format the response
formatted_response = self._format_gemini_response(result)
return {
"success": result.get("status") == "success",
"result": formatted_response,
"raw_result": result,
}
async def clear_gemini_history(self) -> Dict[str, Any]:
"""Clear Gemini conversation history"""
result = self.gemini.clear_conversation_history()
return {"success": True, "message": result.get("message", "History cleared")}
async def gemini_status(self) -> Dict[str, Any]:
"""Get Gemini integration status and statistics"""
stats = self.gemini.get_statistics() if hasattr(self.gemini, "get_statistics") else {}
status_info = {
"enabled": getattr(self.gemini, "enabled", False),
"auto_consult": getattr(self.gemini, "auto_consult", False),
"model": self.gemini_config.get("model", "unknown"),
"timeout": self.gemini_config.get("timeout", 60),
"statistics": stats,
}
return {"success": True, "status": status_info}
async def toggle_gemini_auto_consult(self, enable: Optional[bool] = None) -> Dict[str, Any]:
"""Toggle automatic Gemini consultation
Args:
enable: True to enable, False to disable, None to toggle
Returns:
Dictionary with new status
"""
if enable is None:
# Toggle current state
self.gemini.auto_consult = not getattr(self.gemini, "auto_consult", False)
else:
self.gemini.auto_consult = bool(enable)
status = "enabled" if self.gemini.auto_consult else "disabled"
return {
"success": True,
"status": status,
"message": f"Gemini auto-consultation is now {status}",
}
def _format_gemini_response(self, result: Dict[str, Any]) -> str:
"""Format Gemini consultation response"""
output_lines = []
output_lines.append("🤖 Gemini Consultation Response")
output_lines.append("=" * 40)
output_lines.append("")
if result["status"] == "success":
output_lines.append(f"✅ Consultation ID: {result.get('consultation_id', 'N/A')}")
output_lines.append(f"⏱️ Execution time: {result.get('execution_time', 0):.2f}s")
output_lines.append("")
# Display the raw response
response = result.get("response", "")
if response:
output_lines.append("📄 Response:")
output_lines.append(response)
elif result["status"] == "disabled":
output_lines.append("ℹ️ Gemini consultation is currently disabled")
output_lines.append("💡 Enable with: toggle_gemini_auto_consult")
elif result["status"] == "timeout":
output_lines.append(f"❌ {result.get('error', 'Timeout error')}")
output_lines.append("💡 Try increasing the timeout or simplifying the query")
else: # error
output_lines.append(f"❌ Error: {result.get('error', 'Unknown error')}")
output_lines.append("")
output_lines.append("💡 Troubleshooting:")
output_lines.append(" 1. Check if Gemini CLI is installed and in PATH")
output_lines.append(" 2. Verify Gemini CLI authentication")
output_lines.append(" 3. Check the logs for more details")
return "\n".join(output_lines)
async def run_stdio(self):
"""Run the server in stdio mode (for Claude desktop app)"""
server = Server(self.name)
# Store tools and their functions for later access
self._tools = self.get_tools()
self._tool_funcs = {}
for tool_name, tool_info in self._tools.items():
tool_func = getattr(self, tool_name, None)
if tool_func:
self._tool_funcs[tool_name] = tool_func
@server.list_tools()
async def list_tools() -> List[types.Tool]:
"""List available tools"""
tools = []
for tool_name, tool_info in self._tools.items():
tools.append(
types.Tool(
name=tool_name,
description=tool_info.get("description", ""),
inputSchema=tool_info.get("parameters", {}),
)
)
return tools
@server.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
"""Call a tool with given arguments"""
if name not in self._tool_funcs:
return [types.TextContent(type="text", text=f"Tool '{name}' not found")]
try:
# Call the tool function
result = await self._tool_funcs[name](**arguments)
# Convert result to MCP response format
if isinstance(result, dict):
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
return [types.TextContent(type="text", text=str(result))]
except Exception as e:
self.logger.error(f"Error calling tool {name}: {str(e)}")
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
# Run the stdio server
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=self.name,
server_version=self.version,
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def run_http(self):
"""Run the server in HTTP mode"""
import uvicorn
uvicorn.run(self.app, host="0.0.0.0", port=self.port)
def run(self, mode: str = "http"):
"""Run the server in specified mode"""
if mode == "stdio":
asyncio.run(self.run_stdio())
elif mode == "http":
self.run_http()
else:
raise ValueError(f"Unknown mode: {mode}. Use 'stdio' or 'http'.")
def main():
"""Run the Gemini MCP Server"""
import argparse
parser = argparse.ArgumentParser(description="Gemini AI Integration MCP Server")
parser.add_argument(
"--mode",
choices=["http", "stdio"],
default="stdio", # Default to stdio for Gemini
help="Server mode (http or stdio)",
)
parser.add_argument("--project-root", default=None, help="Project root directory")
args = parser.parse_args()
server = GeminiMCPServer(project_root=args.project_root)
server.run(mode=args.mode)
if __name__ == "__main__":
main()
# Gemini MCP Server Requirements
mcp>=0.9.0
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.0.0
python-multipart>=0.0.6
#!/bin/bash
#
# Start script for Gemini MCP Server
# Supports both stdio and HTTP modes
#
# Default values
MODE="stdio"
PROJECT_ROOT="."
PORT="8006"
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--mode)
MODE="$2"
shift 2
;;
--project-root)
PROJECT_ROOT="$2"
shift 2
;;
--port)
PORT="$2"
shift 2
;;
--help)
echo "Usage: $0 [options]"
echo "Options:"
echo " --mode <stdio|http> Server mode (default: stdio)"
echo " --project-root <path> Project root directory (default: .)"
echo " --port <port> Port for HTTP mode (default: 8006)"
echo " --help Show this help message"
exit 0
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
# Check if running in container
if [ -f /.dockerenv ] || [ -n "$CONTAINER_ENV" ]; then
echo "ERROR: Gemini MCP Server cannot run inside a container!"
echo "The Gemini CLI requires Docker access and must run on the host system."
echo "Please run this script on your host machine."
exit 1
fi
# Check if Python is available
if ! command -v python &> /dev/null && ! command -v python3 &> /dev/null; then
echo "ERROR: Python is not installed or not in PATH"
exit 1
fi
# Use python3 if available, otherwise python
PYTHON_CMD="python"
if command -v python3 &> /dev/null; then
PYTHON_CMD="python3"
fi
# Get the directory where this script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Change to script directory
cd "$SCRIPT_DIR"
# Start the server
echo "Starting Gemini MCP Server..."
echo "Mode: $MODE"
echo "Project Root: $PROJECT_ROOT"
if [ "$MODE" = "http" ]; then
echo "Port: $PORT"
export UVICORN_PORT=$PORT
fi
# Run the server
exec $PYTHON_CMD gemini_mcp_server.py --mode "$MODE" --project-root "$PROJECT_ROOT"
#!/usr/bin/env python3
"""
Test script for Gemini MCP Server
Tests both HTTP and stdio modes
"""
import argparse
import asyncio
import json
import sys
from typing import Any, Dict
import httpx
async def test_http_mode(base_url: str = "http://localhost:8006"):
"""Test the HTTP mode of the server"""
print("Testing HTTP mode...")
async with httpx.AsyncClient() as client:
# Test health endpoint
print("\n1. Testing health check...")
response = await client.get(f"{base_url}/health")
if response.status_code == 200:
print(f"✅ Health check passed: {response.json()}")
else:
print(f"❌ Health check failed: {response.status_code}")
# Test tools listing
print("\n2. Testing tools listing...")
response = await client.get(f"{base_url}/mcp/tools")
if response.status_code == 200:
tools = response.json()
print(f"✅ Found {len(tools['tools'])} tools:")
for tool in tools['tools']:
print(f" - {tool['name']}: {tool['description']}")
else:
print(f"❌ Tools listing failed: {response.status_code}")
# Test Gemini status
print("\n3. Testing Gemini status...")
response = await client.post(
f"{base_url}/mcp/execute",
json={"tool": "gemini_status"}
)
if response.status_code == 200:
result = response.json()
if result['success']:
print(f"✅ Gemini status: {json.dumps(result['result'], indent=2)}")
else:
print(f"❌ Gemini status failed: {result.get('error')}")
else:
print(f"❌ Status request failed: {response.status_code}")
# Test simple consultation
print("\n4. Testing Gemini consultation...")
response = await client.post(
f"{base_url}/mcp/execute",
json={
"tool": "consult_gemini",
"arguments": {
"query": "What is 2 + 2?",
"force": True
}
}
)
if response.status_code == 200:
result = response.json()
if result['success']:
print(f"✅ Consultation successful")
print(f" Response: {result['result'][:200]}...")
else:
print(f"❌ Consultation failed: {result.get('error')}")
else:
print(f"❌ Consultation request failed: {response.status_code}")
async def test_stdio_mode():
"""Test the stdio mode of the server"""
print("Testing stdio mode...")
print("\n⚠️ Note: stdio mode requires manual testing")
print("To test stdio mode:")
print("1. Run: python gemini_mcp_server.py --mode stdio")
print("2. Send JSON-RPC messages via stdin")
print("3. Example initialization message:")
init_message = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"clientInfo": {
"name": "test-client",
"version": "1.0.0"
}
},
"id": 1
}
print(json.dumps(init_message, indent=2))
print("\n4. Then send tools/list request:")
list_message = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 2
}
print(json.dumps(list_message, indent=2))
async def test_mcp_protocol(base_url: str = "http://localhost:8006"):
"""Test MCP protocol endpoints"""
print("\nTesting MCP Protocol endpoints...")
async with httpx.AsyncClient() as client:
# Test messages endpoint
print("\n1. Testing /messages endpoint...")
# Initialize session
init_request = {
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"clientInfo": {
"name": "test-client",
"version": "1.0.0"
}
},
"id": 1
}
response = await client.post(
f"{base_url}/messages",
json=init_request,
headers={"Content-Type": "application/json"}
)
if response.status_code == 200:
result = response.json()
print(f"✅ Initialization successful: {json.dumps(result, indent=2)}")
session_id = response.headers.get("Mcp-Session-Id")
print(f" Session ID: {session_id}")
# List tools
print("\n2. Listing tools via MCP protocol...")
list_request = {
"jsonrpc": "2.0",
"method": "tools/list",
"params": {},
"id": 2
}
response = await client.post(
f"{base_url}/messages",
json=list_request,
headers={
"Content-Type": "application/json",
"Mcp-Session-Id": session_id
}
)
if response.status_code == 200:
result = response.json()
print(f"✅ Tools listed: {len(result['result']['tools'])} tools available")
else:
print(f"❌ Tools listing failed: {response.status_code}")
else:
print(f"❌ Initialization failed: {response.status_code}")
async def main():
parser = argparse.ArgumentParser(description="Test Gemini MCP Server")
parser.add_argument(
"--mode",
choices=["http", "stdio", "both"],
default="http",
help="Test mode"
)
parser.add_argument(
"--url",
default="http://localhost:8006",
help="Server URL for HTTP mode"
)
args = parser.parse_args()
print("🧪 Gemini MCP Server Test Suite")
print("=" * 40)
if args.mode in ["http", "both"]:
try:
await test_http_mode(args.url)
await test_mcp_protocol(args.url)
except httpx.ConnectError:
print(f"\n❌ Could not connect to server at {args.url}")
print(" Make sure the server is running: python gemini_mcp_server.py --mode http")
sys.exit(1)
if args.mode in ["stdio", "both"]:
await test_stdio_mode()
print("\n✅ Test suite completed!")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment