#!/usr/bin/env python3 """ MCP Scanner Author: Thomas Roccia | @fr0gger_ Packages to install: - requests - httpx - mcp """ import sys import asyncio import json import argparse from typing import Optional, Dict, List, Any import requests from urllib.parse import urlparse from httpx import HTTPStatusError from mcp import ClientSession from mcp.client.sse import sse_client # SSL warnings import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class MCPToolAnalyzer: def __init__(self, target_url: str, token: Optional[str] = None, timeout: float = 5.0, verbose: bool = False, output_file: str = "mcp_tools_report.json"): self.target_url = target_url.rstrip("/") self.token = token self.timeout = timeout self.verbose = verbose self.output_file = output_file self.results = { "endpoints": [], "tools": [] } def log(self, level: str, message: str): """Logging function with different levels.""" levels = { "info": "[*]", "success": "[+]", "error": "[-]", "verbose": "[v]", } prefix = levels.get(level, "[*]") if level != "verbose" or (level == "verbose" and self.verbose): print(f"{prefix} {message}") def find_endpoints(self) -> List[str]: """Discover potential MCP endpoints.""" base = self.target_url parsed = urlparse(base) # potential endpoint paths candidates = [ base, f"{base}/sse", f"{base}/mcp", f"{base}/api/sse", f"{base}/api/mcp", f"{base}/v1/sse", f"{base}/v1/mcp", f"{base}/model", f"{base}/llm", f"{parsed.scheme}://{parsed.netloc}/sse", f"{parsed.scheme}://{parsed.netloc}/mcp", ] candidates = list(set(candidates)) discovered = [] self.log("info", f"Starting endpoint discovery on {base}") for url in candidates: self.log("verbose", f"Probing {url}") if self.probe_sse(url): if url not in discovered: self.log("success", f"Detected SSE endpoint at {url}") discovered.append(url) self.results["endpoints"].append({ "url": url, "type": "sse", }) if not discovered: self.log("error", "No MCP SSE endpoints found") return discovered def probe_sse(self, url: str) -> bool: """Check if the endpoint speaks SSE (text/event-stream).""" try: headers = { "User-Agent": "MCPToolAnalyzer/1.0", "Authorization": f"Bearer {self.token}" if self.token else "", } # HEAD r = requests.head(url, timeout=self.timeout, allow_redirects=True, headers=headers, verify=False) if "text/event-stream" in r.headers.get("Content-Type", "").lower(): return True # GET g = requests.get(url, timeout=self.timeout, allow_redirects=True, stream=True, headers=headers, verify=False) if "text/event-stream" in g.headers.get("Content-Type", "").lower(): return True return False except Exception as e: if self.verbose: self.log("verbose", f"Error probing {url}: {str(e)}") return False async def analyze_tools(self, url: str): """Connect to SSE, list and analyze all available tools.""" headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} self.log("info", f"Connecting to SSE at {url}") try: async with sse_client(url=url, headers=headers) as (rs, ws): async with ClientSession(rs, ws) as session: await session.initialize() # Get server capabilities via initialize response if available self.log("info", "Successfully initialized connection to MCP server") # List available tools resp = await session.list_tools() tools = resp.tools self.log("success", f"Found {len(tools)} tools") # Analyze each tool for tool in tools: tool_name = tool.name self.log("info", f"\n=== Tool: {tool_name} ===") # Extract tool details tool_details = { "name": tool_name, "description": getattr(tool, "description", "No description available"), "parameters": {}, "example_usage": {}, "function_signature": "", } # Display description if hasattr(tool, "description") and tool.description: self.log("info", f"Description: {tool.description}") # Analyze parameters if hasattr(tool, "parameters"): params = tool.parameters if params: self.log("info", "Parameters:") if isinstance(params, dict): required_params = params.get("required", []) properties = params.get("properties", {}) for param_name, param_details in properties.items(): if isinstance(param_details, dict): param_type = param_details.get("type", "unknown") param_desc = param_details.get("description", "No description") param_required = "Required" if param_name in required_params else "Optional" self.log("info", f" - {param_name} ({param_type}, {param_required}): {param_desc}") tool_details["parameters"][param_name] = { "type": param_type, "description": param_desc, "required": param_name in required_params, "format": param_details.get("format", None), "default": param_details.get("default", None) } else: self.log("verbose", f" Parameters format not recognized: {type(params)}") try: if tool_details["parameters"]: param_strings = [] for param_name, param_info in tool_details["parameters"].items(): if param_info["required"]: param_strings.append(f"{param_name}: {param_info['type']}") else: default_val = param_info.get("default", "None") param_strings.append(f"{param_name}: {param_info['type']} = {default_val}") func_sig = f"function {tool_name}({', '.join(param_strings)})" tool_details["function_signature"] = func_sig self.log("info", f"Function signature: {func_sig}") except Exception as e: if self.verbose: self.log("verbose", f"Error generating function signature: {e}") if tool_details["parameters"]: example_input = {} for param_name, param_info in tool_details["parameters"].items(): if param_info["required"]: if param_info["type"] == "string": example_input[param_name] = f"example_{param_name}" elif param_info["type"] == "number" or param_info["type"] == "integer": example_input[param_name] = 123 elif param_info["type"] == "boolean": example_input[param_name] = True elif param_info["type"] == "object": example_input[param_name] = {"key": "value"} elif param_info["type"] == "array": example_input[param_name] = ["item1", "item2"] tool_details["example_usage"] = example_input self.log("info", f"Example usage: {json.dumps(example_input, indent=2)}") if self.verbose: try: self.log("verbose", "Attempting to call tool with example input...") result = await session.call_tool(tool_name, example_input) self.log("success", f"Tool call successful") self.log("verbose", f"Response: {result}") tool_details["example_response"] = result except Exception as e: self.log("error", f"Tool call failed: {e}") tool_details["example_response"] = {"error": str(e)} if not any(t["name"] == tool_name for t in self.results["tools"]): self.results["tools"].append(tool_details) except HTTPStatusError as e: self.log("error", f"SSE error: {e.response.status_code} {e.response.reason_phrase}") except Exception as e: self.log("error", f"SSE error: {e}") async def analyze(self): """Main analysis function.""" self.log("info", f"Starting MCP tool analysis for {self.target_url}") endpoints = self.find_endpoints() if not endpoints: return analyzed_endpoints = set() for url in endpoints: if url not in analyzed_endpoints: await self.analyze_tools(url) analyzed_endpoints.add(url) self.generate_report() def generate_report(self): """Generate a JSON report of findings.""" self.log("info", "\n=== Analysis completed. Generating report...") with open(self.output_file, "w") as f: json.dump(self.results, f, indent=4) self.log("success", f"Report saved to {self.output_file}") self.log("info", f"\n=== SUMMARY ===") self.log("info", f"Endpoints discovered: {len(self.results['endpoints'])}") self.log("info", f"Tools discovered: {len(self.results['tools'])}") # Print tool list if self.results["tools"]: self.log("info", f"\n=== AVAILABLE TOOLS ===") for tool in self.results["tools"]: self.log("success", f"{tool['name']}: {tool['description']}") # Suggest potential usage if tool["parameters"]: param_list = [] for param_name, param_info in tool["parameters"].items(): if param_info["required"]: param_list.append(f"{param_name}=") else: param_list.append(f"{param_name}=") self.log("info", f" Usage: {tool['name']}({', '.join(param_list)})") def main(): print("=== MCP Scanner by @fr0gger_ ===") parser = argparse.ArgumentParser(description="MCP Scanner") parser.add_argument("url", help="Target URL to analyze") parser.add_argument("-t", "--token", help="Authentication token to use") parser.add_argument("-o", "--output", default="mcp_tools_report.json", help="Output file for analysis results") parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") parser.add_argument("--timeout", type=float, default=5.0, help="Connection timeout in seconds") args = parser.parse_args() print(f"Target: {args.url}") print("") analyzer = MCPToolAnalyzer( args.url, args.token, args.timeout, args.verbose, args.output ) try: asyncio.run(analyzer.analyze()) except KeyboardInterrupt: print("\nAnalysis interrupted by user") sys.exit(1) if __name__ == "__main__": main()