Skip to content

Instantly share code, notes, and snippets.

@fr0gger
Last active May 29, 2025 04:11
Show Gist options
  • Save fr0gger/1731d89a02d08a1bc9a00982c02e2f44 to your computer and use it in GitHub Desktop.
Save fr0gger/1731d89a02d08a1bc9a00982c02e2f44 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
MCP Scanner
Author: Thomas Roccia | @fr0gger_
Packages to install:
- requests
- httpx
- mcp
"""
import sys
import asyncio
import json
import argparse
from typing import Optional, Dict, List, Any
import requests
from urllib.parse import urlparse
from httpx import HTTPStatusError
from mcp import ClientSession
from mcp.client.sse import sse_client
# SSL warnings
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class MCPToolAnalyzer:
def __init__(self, target_url: str, token: Optional[str] = None,
timeout: float = 5.0, verbose: bool = False,
output_file: str = "mcp_tools_report.json"):
self.target_url = target_url.rstrip("/")
self.token = token
self.timeout = timeout
self.verbose = verbose
self.output_file = output_file
self.results = {
"endpoints": [],
"tools": []
}
def log(self, level: str, message: str):
"""Logging function with different levels."""
levels = {
"info": "[*]",
"success": "[+]",
"error": "[-]",
"verbose": "[v]",
}
prefix = levels.get(level, "[*]")
if level != "verbose" or (level == "verbose" and self.verbose):
print(f"{prefix} {message}")
def find_endpoints(self) -> List[str]:
"""Discover potential MCP endpoints."""
base = self.target_url
parsed = urlparse(base)
# potential endpoint paths
candidates = [
base,
f"{base}/sse",
f"{base}/mcp",
f"{base}/api/sse",
f"{base}/api/mcp",
f"{base}/v1/sse",
f"{base}/v1/mcp",
f"{base}/model",
f"{base}/llm",
f"{parsed.scheme}://{parsed.netloc}/sse",
f"{parsed.scheme}://{parsed.netloc}/mcp",
]
candidates = list(set(candidates))
discovered = []
self.log("info", f"Starting endpoint discovery on {base}")
for url in candidates:
self.log("verbose", f"Probing {url}")
if self.probe_sse(url):
if url not in discovered:
self.log("success", f"Detected SSE endpoint at {url}")
discovered.append(url)
self.results["endpoints"].append({
"url": url,
"type": "sse",
})
if not discovered:
self.log("error", "No MCP SSE endpoints found")
return discovered
def probe_sse(self, url: str) -> bool:
"""Check if the endpoint speaks SSE (text/event-stream)."""
try:
headers = {
"User-Agent": "MCPToolAnalyzer/1.0",
"Authorization": f"Bearer {self.token}" if self.token else "",
}
# HEAD
r = requests.head(url, timeout=self.timeout, allow_redirects=True,
headers=headers, verify=False)
if "text/event-stream" in r.headers.get("Content-Type", "").lower():
return True
# GET
g = requests.get(url, timeout=self.timeout, allow_redirects=True,
stream=True, headers=headers, verify=False)
if "text/event-stream" in g.headers.get("Content-Type", "").lower():
return True
return False
except Exception as e:
if self.verbose:
self.log("verbose", f"Error probing {url}: {str(e)}")
return False
async def analyze_tools(self, url: str):
"""Connect to SSE, list and analyze all available tools."""
headers = {"Authorization": f"Bearer {self.token}"} if self.token else {}
self.log("info", f"Connecting to SSE at {url}")
try:
async with sse_client(url=url, headers=headers) as (rs, ws):
async with ClientSession(rs, ws) as session:
await session.initialize()
# Get server capabilities via initialize response if available
self.log("info", "Successfully initialized connection to MCP server")
# List available tools
resp = await session.list_tools()
tools = resp.tools
self.log("success", f"Found {len(tools)} tools")
# Analyze each tool
for tool in tools:
tool_name = tool.name
self.log("info", f"\n=== Tool: {tool_name} ===")
# Extract tool details
tool_details = {
"name": tool_name,
"description": getattr(tool, "description", "No description available"),
"parameters": {},
"example_usage": {},
"function_signature": "",
}
# Display description
if hasattr(tool, "description") and tool.description:
self.log("info", f"Description: {tool.description}")
# Analyze parameters
if hasattr(tool, "parameters"):
params = tool.parameters
if params:
self.log("info", "Parameters:")
if isinstance(params, dict):
required_params = params.get("required", [])
properties = params.get("properties", {})
for param_name, param_details in properties.items():
if isinstance(param_details, dict):
param_type = param_details.get("type", "unknown")
param_desc = param_details.get("description", "No description")
param_required = "Required" if param_name in required_params else "Optional"
self.log("info", f" - {param_name} ({param_type}, {param_required}): {param_desc}")
tool_details["parameters"][param_name] = {
"type": param_type,
"description": param_desc,
"required": param_name in required_params,
"format": param_details.get("format", None),
"default": param_details.get("default", None)
}
else:
self.log("verbose", f" Parameters format not recognized: {type(params)}")
try:
if tool_details["parameters"]:
param_strings = []
for param_name, param_info in tool_details["parameters"].items():
if param_info["required"]:
param_strings.append(f"{param_name}: {param_info['type']}")
else:
default_val = param_info.get("default", "None")
param_strings.append(f"{param_name}: {param_info['type']} = {default_val}")
func_sig = f"function {tool_name}({', '.join(param_strings)})"
tool_details["function_signature"] = func_sig
self.log("info", f"Function signature: {func_sig}")
except Exception as e:
if self.verbose:
self.log("verbose", f"Error generating function signature: {e}")
if tool_details["parameters"]:
example_input = {}
for param_name, param_info in tool_details["parameters"].items():
if param_info["required"]:
if param_info["type"] == "string":
example_input[param_name] = f"example_{param_name}"
elif param_info["type"] == "number" or param_info["type"] == "integer":
example_input[param_name] = 123
elif param_info["type"] == "boolean":
example_input[param_name] = True
elif param_info["type"] == "object":
example_input[param_name] = {"key": "value"}
elif param_info["type"] == "array":
example_input[param_name] = ["item1", "item2"]
tool_details["example_usage"] = example_input
self.log("info", f"Example usage: {json.dumps(example_input, indent=2)}")
if self.verbose:
try:
self.log("verbose", "Attempting to call tool with example input...")
result = await session.call_tool(tool_name, example_input)
self.log("success", f"Tool call successful")
self.log("verbose", f"Response: {result}")
tool_details["example_response"] = result
except Exception as e:
self.log("error", f"Tool call failed: {e}")
tool_details["example_response"] = {"error": str(e)}
if not any(t["name"] == tool_name for t in self.results["tools"]):
self.results["tools"].append(tool_details)
except HTTPStatusError as e:
self.log("error", f"SSE error: {e.response.status_code} {e.response.reason_phrase}")
except Exception as e:
self.log("error", f"SSE error: {e}")
async def analyze(self):
"""Main analysis function."""
self.log("info", f"Starting MCP tool analysis for {self.target_url}")
endpoints = self.find_endpoints()
if not endpoints:
return
analyzed_endpoints = set()
for url in endpoints:
if url not in analyzed_endpoints:
await self.analyze_tools(url)
analyzed_endpoints.add(url)
self.generate_report()
def generate_report(self):
"""Generate a JSON report of findings."""
self.log("info", "\n=== Analysis completed. Generating report...")
with open(self.output_file, "w") as f:
json.dump(self.results, f, indent=4)
self.log("success", f"Report saved to {self.output_file}")
self.log("info", f"\n=== SUMMARY ===")
self.log("info", f"Endpoints discovered: {len(self.results['endpoints'])}")
self.log("info", f"Tools discovered: {len(self.results['tools'])}")
# Print tool list
if self.results["tools"]:
self.log("info", f"\n=== AVAILABLE TOOLS ===")
for tool in self.results["tools"]:
self.log("success", f"{tool['name']}: {tool['description']}")
# Suggest potential usage
if tool["parameters"]:
param_list = []
for param_name, param_info in tool["parameters"].items():
if param_info["required"]:
param_list.append(f"{param_name}=<required>")
else:
param_list.append(f"{param_name}=<optional>")
self.log("info", f" Usage: {tool['name']}({', '.join(param_list)})")
def main():
print("=== MCP Scanner by @fr0gger_ ===")
parser = argparse.ArgumentParser(description="MCP Scanner")
parser.add_argument("url", help="Target URL to analyze")
parser.add_argument("-t", "--token", help="Authentication token to use")
parser.add_argument("-o", "--output", default="mcp_tools_report.json", help="Output file for analysis results")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose output")
parser.add_argument("--timeout", type=float, default=5.0, help="Connection timeout in seconds")
args = parser.parse_args()
print(f"Target: {args.url}")
print("")
analyzer = MCPToolAnalyzer(
args.url,
args.token,
args.timeout,
args.verbose,
args.output
)
try:
asyncio.run(analyzer.analyze())
except KeyboardInterrupt:
print("\nAnalysis interrupted by user")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment