Last active
May 29, 2025 04:11
-
-
Save fr0gger/1731d89a02d08a1bc9a00982c02e2f44 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
MCP Scanner | |
Author: Thomas Roccia | @fr0gger_ | |
Packages to install: | |
- requests | |
- httpx | |
- mcp | |
""" | |
import sys | |
import asyncio | |
import json | |
import argparse | |
from typing import Optional, Dict, List, Any | |
import requests | |
from urllib.parse import urlparse | |
from httpx import HTTPStatusError | |
from mcp import ClientSession | |
from mcp.client.sse import sse_client | |
# SSL warnings | |
import urllib3 | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
class MCPToolAnalyzer: | |
def __init__(self, target_url: str, token: Optional[str] = None, | |
timeout: float = 5.0, verbose: bool = False, | |
output_file: str = "mcp_tools_report.json"): | |
self.target_url = target_url.rstrip("/") | |
self.token = token | |
self.timeout = timeout | |
self.verbose = verbose | |
self.output_file = output_file | |
self.results = { | |
"endpoints": [], | |
"tools": [] | |
} | |
def log(self, level: str, message: str): | |
"""Logging function with different levels.""" | |
levels = { | |
"info": "[*]", | |
"success": "[+]", | |
"error": "[-]", | |
"verbose": "[v]", | |
} | |
prefix = levels.get(level, "[*]") | |
if level != "verbose" or (level == "verbose" and self.verbose): | |
print(f"{prefix} {message}") | |
def find_endpoints(self) -> List[str]: | |
"""Discover potential MCP endpoints.""" | |
base = self.target_url | |
parsed = urlparse(base) | |
# potential endpoint paths | |
candidates = [ | |
base, | |
f"{base}/sse", | |
f"{base}/mcp", | |
f"{base}/api/sse", | |
f"{base}/api/mcp", | |
f"{base}/v1/sse", | |
f"{base}/v1/mcp", | |
f"{base}/model", | |
f"{base}/llm", | |
f"{parsed.scheme}://{parsed.netloc}/sse", | |
f"{parsed.scheme}://{parsed.netloc}/mcp", | |
] | |
candidates = list(set(candidates)) | |
discovered = [] | |
self.log("info", f"Starting endpoint discovery on {base}") | |
for url in candidates: | |
self.log("verbose", f"Probing {url}") | |
if self.probe_sse(url): | |
if url not in discovered: | |
self.log("success", f"Detected SSE endpoint at {url}") | |
discovered.append(url) | |
self.results["endpoints"].append({ | |
"url": url, | |
"type": "sse", | |
}) | |
if not discovered: | |
self.log("error", "No MCP SSE endpoints found") | |
return discovered | |
def probe_sse(self, url: str) -> bool: | |
"""Check if the endpoint speaks SSE (text/event-stream).""" | |
try: | |
headers = { | |
"User-Agent": "MCPToolAnalyzer/1.0", | |
"Authorization": f"Bearer {self.token}" if self.token else "", | |
} | |
# HEAD | |
r = requests.head(url, timeout=self.timeout, allow_redirects=True, | |
headers=headers, verify=False) | |
if "text/event-stream" in r.headers.get("Content-Type", "").lower(): | |
return True | |
# GET | |
g = requests.get(url, timeout=self.timeout, allow_redirects=True, | |
stream=True, headers=headers, verify=False) | |
if "text/event-stream" in g.headers.get("Content-Type", "").lower(): | |
return True | |
return False | |
except Exception as e: | |
if self.verbose: | |
self.log("verbose", f"Error probing {url}: {str(e)}") | |
return False | |
async def analyze_tools(self, url: str): | |
"""Connect to SSE, list and analyze all available tools.""" | |
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {} | |
self.log("info", f"Connecting to SSE at {url}") | |
try: | |
async with sse_client(url=url, headers=headers) as (rs, ws): | |
async with ClientSession(rs, ws) as session: | |
await session.initialize() | |
# Get server capabilities via initialize response if available | |
self.log("info", "Successfully initialized connection to MCP server") | |
# List available tools | |
resp = await session.list_tools() | |
tools = resp.tools | |
self.log("success", f"Found {len(tools)} tools") | |
# Analyze each tool | |
for tool in tools: | |
tool_name = tool.name | |
self.log("info", f"\n=== Tool: {tool_name} ===") | |
# Extract tool details | |
tool_details = { | |
"name": tool_name, | |
"description": getattr(tool, "description", "No description available"), | |
"parameters": {}, | |
"example_usage": {}, | |
"function_signature": "", | |
} | |
# Display description | |
if hasattr(tool, "description") and tool.description: | |
self.log("info", f"Description: {tool.description}") | |
# Analyze parameters | |
if hasattr(tool, "parameters"): | |
params = tool.parameters | |
if params: | |
self.log("info", "Parameters:") | |
if isinstance(params, dict): | |
required_params = params.get("required", []) | |
properties = params.get("properties", {}) | |
for param_name, param_details in properties.items(): | |
if isinstance(param_details, dict): | |
param_type = param_details.get("type", "unknown") | |
param_desc = param_details.get("description", "No description") | |
param_required = "Required" if param_name in required_params else "Optional" | |
self.log("info", f" - {param_name} ({param_type}, {param_required}): {param_desc}") | |
tool_details["parameters"][param_name] = { | |
"type": param_type, | |
"description": param_desc, | |
"required": param_name in required_params, | |
"format": param_details.get("format", None), | |
"default": param_details.get("default", None) | |
} | |
else: | |
self.log("verbose", f" Parameters format not recognized: {type(params)}") | |
try: | |
if tool_details["parameters"]: | |
param_strings = [] | |
for param_name, param_info in tool_details["parameters"].items(): | |
if param_info["required"]: | |
param_strings.append(f"{param_name}: {param_info['type']}") | |
else: | |
default_val = param_info.get("default", "None") | |
param_strings.append(f"{param_name}: {param_info['type']} = {default_val}") | |
func_sig = f"function {tool_name}({', '.join(param_strings)})" | |
tool_details["function_signature"] = func_sig | |
self.log("info", f"Function signature: {func_sig}") | |
except Exception as e: | |
if self.verbose: | |
self.log("verbose", f"Error generating function signature: {e}") | |
if tool_details["parameters"]: | |
example_input = {} | |
for param_name, param_info in tool_details["parameters"].items(): | |
if param_info["required"]: | |
if param_info["type"] == "string": | |
example_input[param_name] = f"example_{param_name}" | |
elif param_info["type"] == "number" or param_info["type"] == "integer": | |
example_input[param_name] = 123 | |
elif param_info["type"] == "boolean": | |
example_input[param_name] = True | |
elif param_info["type"] == "object": | |
example_input[param_name] = {"key": "value"} | |
elif param_info["type"] == "array": | |
example_input[param_name] = ["item1", "item2"] | |
tool_details["example_usage"] = example_input | |
self.log("info", f"Example usage: {json.dumps(example_input, indent=2)}") | |
if self.verbose: | |
try: | |
self.log("verbose", "Attempting to call tool with example input...") | |
result = await session.call_tool(tool_name, example_input) | |
self.log("success", f"Tool call successful") | |
self.log("verbose", f"Response: {result}") | |
tool_details["example_response"] = result | |
except Exception as e: | |
self.log("error", f"Tool call failed: {e}") | |
tool_details["example_response"] = {"error": str(e)} | |
if not any(t["name"] == tool_name for t in self.results["tools"]): | |
self.results["tools"].append(tool_details) | |
except HTTPStatusError as e: | |
self.log("error", f"SSE error: {e.response.status_code} {e.response.reason_phrase}") | |
except Exception as e: | |
self.log("error", f"SSE error: {e}") | |
async def analyze(self): | |
"""Main analysis function.""" | |
self.log("info", f"Starting MCP tool analysis for {self.target_url}") | |
endpoints = self.find_endpoints() | |
if not endpoints: | |
return | |
analyzed_endpoints = set() | |
for url in endpoints: | |
if url not in analyzed_endpoints: | |
await self.analyze_tools(url) | |
analyzed_endpoints.add(url) | |
self.generate_report() | |
def generate_report(self): | |
"""Generate a JSON report of findings.""" | |
self.log("info", "\n=== Analysis completed. Generating report...") | |
with open(self.output_file, "w") as f: | |
json.dump(self.results, f, indent=4) | |
self.log("success", f"Report saved to {self.output_file}") | |
self.log("info", f"\n=== SUMMARY ===") | |
self.log("info", f"Endpoints discovered: {len(self.results['endpoints'])}") | |
self.log("info", f"Tools discovered: {len(self.results['tools'])}") | |
# Print tool list | |
if self.results["tools"]: | |
self.log("info", f"\n=== AVAILABLE TOOLS ===") | |
for tool in self.results["tools"]: | |
self.log("success", f"{tool['name']}: {tool['description']}") | |
# Suggest potential usage | |
if tool["parameters"]: | |
param_list = [] | |
for param_name, param_info in tool["parameters"].items(): | |
if param_info["required"]: | |
param_list.append(f"{param_name}=<required>") | |
else: | |
param_list.append(f"{param_name}=<optional>") | |
self.log("info", f" Usage: {tool['name']}({', '.join(param_list)})") | |
def main(): | |
print("=== MCP Scanner by @fr0gger_ ===") | |
parser = argparse.ArgumentParser(description="MCP Scanner") | |
parser.add_argument("url", help="Target URL to analyze") | |
parser.add_argument("-t", "--token", help="Authentication token to use") | |
parser.add_argument("-o", "--output", default="mcp_tools_report.json", help="Output file for analysis results") | |
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output") | |
parser.add_argument("--timeout", type=float, default=5.0, help="Connection timeout in seconds") | |
args = parser.parse_args() | |
print(f"Target: {args.url}") | |
print("") | |
analyzer = MCPToolAnalyzer( | |
args.url, | |
args.token, | |
args.timeout, | |
args.verbose, | |
args.output | |
) | |
try: | |
asyncio.run(analyzer.analyze()) | |
except KeyboardInterrupt: | |
print("\nAnalysis interrupted by user") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment