|
#!/usr/bin/env python3 |
|
""" |
|
✅ Method Validator - An AI Agent's Tool for API Discovery and Validation |
|
|
|
This tool helps AI agents quickly discover and validate existing methods in Python packages |
|
before implementing new solutions. It should ONLY be used for analyzing non-standard packages |
|
that are directly relevant to the function being written. |
|
|
|
DO NOT USE FOR: |
|
- Standard library packages (os, sys, json, etc.) |
|
- Well-known utility packages (requests, urllib, etc.) |
|
- Packages that aren't directly related to the function being written |
|
|
|
Usage workflow for AI agents: |
|
1. When writing a function that might already exist in a relevant package: |
|
- Use --list-all to check for existing implementations |
|
2. When using a package-specific method: |
|
- Use --method to verify its exact signature and behavior |
|
3. When handling errors: |
|
- Use --exceptions-only to see package-specific exceptions to handle |
|
""" |
|
|
|
import argparse |
|
import inspect |
|
import importlib |
|
import json |
|
import os |
|
import sys |
|
import re |
|
from functools import lru_cache |
|
from pathlib import Path |
|
from typing import Dict, List, Any, Optional, Tuple, Set |
|
from loguru import logger |
|
|
|
# Standard library packages that should be skipped |
|
STANDARD_PACKAGES = { |
|
"abc", |
|
"argparse", |
|
"array", |
|
"ast", |
|
"asyncio", |
|
"base64", |
|
"binascii", |
|
"builtins", |
|
"collections", |
|
"concurrent", |
|
"contextlib", |
|
"copy", |
|
"csv", |
|
"datetime", |
|
"decimal", |
|
"difflib", |
|
"enum", |
|
"functools", |
|
"glob", |
|
"gzip", |
|
"hashlib", |
|
"hmac", |
|
"html", |
|
"http", |
|
"importlib", |
|
"inspect", |
|
"io", |
|
"itertools", |
|
"json", |
|
"logging", |
|
"math", |
|
"multiprocessing", |
|
"operator", |
|
"os", |
|
"pathlib", |
|
"pickle", |
|
"platform", |
|
"pprint", |
|
"queue", |
|
"re", |
|
"random", |
|
"shutil", |
|
"signal", |
|
"socket", |
|
"sqlite3", |
|
"ssl", |
|
"statistics", |
|
"string", |
|
"struct", |
|
"subprocess", |
|
"sys", |
|
"tempfile", |
|
"threading", |
|
"time", |
|
"timeit", |
|
"typing", |
|
"unittest", |
|
"urllib", |
|
"uuid", |
|
"warnings", |
|
"weakref", |
|
"xml", |
|
"zipfile", |
|
"zlib", |
|
} |
|
|
|
# Common utility packages that typically don't need analysis |
|
COMMON_UTILITY_PACKAGES = { |
|
"requests", |
|
"urllib3", |
|
"six", |
|
"setuptools", |
|
"pip", |
|
"wheel", |
|
"pkg_resources", |
|
"pytest", |
|
"nose", |
|
"mock", |
|
"coverage", |
|
"tox", |
|
"flake8", |
|
"pylint", |
|
"mypy", |
|
"black", |
|
"isort", |
|
"yapf", |
|
"autopep8", |
|
} |
|
|
|
|
|
def should_analyze_package(package_name: str) -> bool: |
|
""" |
|
Determine if a package should be analyzed based on its name. |
|
|
|
Returns False for: |
|
- Standard library packages |
|
- Common utility packages |
|
- Package names that suggest they're not directly relevant |
|
""" |
|
# Skip standard library and common utility packages |
|
if ( |
|
package_name in STANDARD_PACKAGES |
|
or package_name in COMMON_UTILITY_PACKAGES |
|
or any(package_name.startswith(f"{pkg}.") for pkg in STANDARD_PACKAGES) |
|
): |
|
return False |
|
|
|
return True |
|
|
|
|
|
def find_venv_path() -> Optional[str]: |
|
"""Find the virtual environment path for the current project. |
|
|
|
Looks for virtual environment in the following order: |
|
1. Check if currently running in a virtual environment (sys.prefix) |
|
2. Look for .venv in the project root directory (uv's default location) |
|
3. Look for venv or .env as fallbacks |
|
""" |
|
# Check if running in a virtual environment |
|
if sys.prefix != sys.base_prefix: |
|
return sys.prefix |
|
|
|
# Get the project root directory (where the .git directory is typically located) |
|
current_dir = Path.cwd() |
|
while current_dir.parent != current_dir: |
|
# Look for common project root indicators |
|
if ( |
|
(current_dir / ".git").exists() |
|
or (current_dir / "pyproject.toml").exists() |
|
or (current_dir / "setup.py").exists() |
|
): |
|
# For uv, check .venv first as it's the default |
|
venv_dir = current_dir / ".venv" |
|
if venv_dir.exists(): |
|
# uv's structure is different - it has 'Scripts' on Windows and 'bin' on Unix |
|
bin_dir = venv_dir / ("Scripts" if os.name == "nt" else "bin") |
|
if bin_dir.exists(): |
|
return str(venv_dir) |
|
|
|
# Fallback to other common venv names |
|
for venv_name in ["venv", ".env"]: |
|
venv_dir = current_dir / venv_name |
|
if venv_dir.exists() and (venv_dir / "bin").exists(): |
|
return str(venv_dir) |
|
break # If we found project root but no venv, stop searching |
|
current_dir = current_dir.parent |
|
|
|
return None |
|
|
|
|
|
class MethodInfo: |
|
"""Structured container for method information.""" |
|
|
|
def __init__(self, obj, name: str): |
|
if obj is None: |
|
raise ValueError(f"Cannot analyze None object for method {name}") |
|
|
|
self.name = name |
|
self.obj = obj |
|
try: |
|
self.doc = inspect.getdoc(obj) or "" |
|
self.signature = str(inspect.signature(obj)) |
|
self.module = obj.__module__ |
|
self.summary = self._generate_summary() |
|
self.parameters = self._analyze_parameters() |
|
self.examples = self._extract_examples() |
|
self.exceptions = self._analyze_exceptions() |
|
self.return_info = self._analyze_return_info() |
|
except Exception as e: |
|
logger.warning(f"Error analyzing method {name}: {e}") |
|
# Set default values for failed analysis |
|
self.doc = "" |
|
self.signature = "()" |
|
self.module = obj.__module__ if hasattr(obj, "__module__") else "" |
|
self.summary = "" |
|
self.parameters = {} |
|
self.examples = [] |
|
self.exceptions = [] |
|
self.return_info = {} |
|
|
|
def _generate_summary(self) -> str: |
|
"""Generate a quick summary from the docstring.""" |
|
if not self.doc: |
|
return "" |
|
summary = self.doc.split("\n")[0].split(".")[0] |
|
return summary[:100] + "..." if len(summary) > 100 else summary |
|
|
|
def _analyze_parameters(self) -> Dict[str, Dict[str, Any]]: |
|
"""Analyze parameter types, defaults, and constraints. |
|
|
|
Focuses on the most relevant parameters by considering: |
|
1. Required parameters |
|
2. Parameters with good documentation |
|
3. Parameters commonly used in examples |
|
4. Parameters that affect core functionality |
|
|
|
Filters out: |
|
- Internal/private parameters |
|
- Rarely used optional parameters |
|
- Debug/development parameters |
|
- Parameters with default values that rarely need changing |
|
""" |
|
params = {} |
|
signature = inspect.signature(self.obj) |
|
|
|
# Extract parameters mentioned in examples |
|
example_params = set() |
|
for example in self._extract_examples(): |
|
param_matches = re.finditer(r"(\w+)\s*=", example) |
|
example_params.update(match.group(1) for match in param_matches) |
|
|
|
# Common parameter name patterns to exclude |
|
exclude_patterns = { |
|
r"^_", # Private params |
|
r"debug$", # Debug flags |
|
r"verbose$", # Verbosity flags |
|
r"(callback|hook)$", # Advanced callback params |
|
r"experimental", # Experimental features |
|
r"internal", # Internal use params |
|
r"deprecated", # Deprecated params |
|
} |
|
|
|
for name, param in signature.parameters.items(): |
|
# Skip excluded parameters |
|
if any(re.search(pattern, name) for pattern in exclude_patterns): |
|
continue |
|
|
|
# Calculate parameter relevance score |
|
relevance = 0 |
|
if param.default == param.empty: # Required param |
|
relevance += 3 |
|
if name in example_params: # Used in examples |
|
relevance += 2 |
|
if self._find_param_description(name): # Has documentation |
|
relevance += 1 |
|
|
|
# Include parameter if it's relevant enough |
|
if relevance >= 2 or name in { |
|
"model", |
|
"messages", |
|
"stream", |
|
"api_key", |
|
}: # Always include core params |
|
params[name] = { |
|
"type": ( |
|
str(param.annotation) |
|
if param.annotation != param.empty |
|
else None |
|
), |
|
"default": ( |
|
None if param.default == param.empty else str(param.default) |
|
), |
|
"required": param.default == param.empty |
|
and param.kind != param.VAR_POSITIONAL, |
|
"description": self._find_param_description(name), |
|
} |
|
|
|
return params |
|
|
|
def _find_param_description(self, param_name: str) -> str: |
|
"""Extract parameter description from docstring.""" |
|
if not self.doc: |
|
return "" |
|
|
|
# Look for :param param_name: or Parameters: section |
|
param_patterns = [ |
|
rf":param {param_name}:\s*([^\n]+)", |
|
rf"Parameters.*?{param_name}\s*[:-]\s*([^\n]+)", |
|
] |
|
|
|
for pattern in param_patterns: |
|
match = re.search(pattern, self.doc, re.DOTALL | re.IGNORECASE) |
|
if match: |
|
return match.group(1).strip() |
|
return "" |
|
|
|
def _extract_examples(self) -> List[str]: |
|
"""Extract usage examples from docstring.""" |
|
if not self.doc: |
|
return [] |
|
|
|
examples = [] |
|
# Look for Examples section or code blocks |
|
example_section = re.split(r"Examples?[:|-]", self.doc) |
|
if len(example_section) > 1: |
|
# Extract code blocks (indented or between ```) |
|
code_blocks = re.findall( |
|
r"```(?:python)?\n(.*?)\n```|\n\s{4}(.*?)(?=\n\S)", |
|
example_section[1], |
|
re.DOTALL, |
|
) |
|
examples = [block[0] or block[1] for block in code_blocks if any(block)] |
|
return examples |
|
|
|
def _analyze_exceptions(self) -> List[Dict[str, str]]: |
|
"""Analyze exceptions that can be raised by the method. |
|
|
|
Focuses on the most relevant exceptions by considering: |
|
1. Explicit raises in docstring or source code |
|
2. Frequency of use across the codebase |
|
3. Quality of documentation |
|
4. Whether it's a custom exception vs generic |
|
""" |
|
exceptions = [] |
|
if not self.doc: |
|
return exceptions |
|
|
|
# Look for explicitly documented exceptions first (highest priority) |
|
raise_patterns = [ |
|
r":raises\s+(\w+):\s*([^\n]+)", |
|
r"Raises:\n(?:\s*-?\s*(\w+):\s*([^\n]+)\n?)*", |
|
] |
|
|
|
for pattern in raise_patterns: |
|
matches = re.finditer(pattern, self.doc, re.MULTILINE) |
|
for match in matches: |
|
if len(match.groups()) == 2: |
|
exc_name, desc = match.groups() |
|
if ( |
|
desc and len(desc.strip()) > 10 |
|
): # Only include well-documented exceptions |
|
exceptions.append( |
|
{ |
|
"type": exc_name, |
|
"description": desc.strip(), |
|
"hierarchy": self._get_exception_hierarchy(exc_name), |
|
"source": "documentation", |
|
} |
|
) |
|
|
|
# Look for raise statements in source code |
|
try: |
|
source = inspect.getsource(self.obj) |
|
custom_exceptions = set() |
|
|
|
# First pass: identify custom exceptions |
|
for line in source.split("\n"): |
|
if "class" in line and "Error" in line and "Exception" in line: |
|
match = re.search(r"class\s+(\w+Error)", line) |
|
if match: |
|
custom_exceptions.add(match.group(1)) |
|
|
|
# Second pass: find raise statements |
|
raise_statements = re.finditer(r"raise\s+(\w+)(?:\(|$)", source) |
|
for match in raise_statements: |
|
exc_name = match.group(1) |
|
# Prioritize custom exceptions and well-known error types |
|
if ( |
|
exc_name in custom_exceptions |
|
or exc_name.endswith("Error") |
|
or exc_name in {"ValueError", "TypeError", "RuntimeError"} |
|
): |
|
|
|
# Don't duplicate exceptions we already found in docstring |
|
if not any(e["type"] == exc_name for e in exceptions): |
|
exceptions.append( |
|
{ |
|
"type": exc_name, |
|
"description": self._infer_exception_description( |
|
exc_name, source |
|
), |
|
"hierarchy": self._get_exception_hierarchy(exc_name), |
|
"source": "source_code", |
|
} |
|
) |
|
except (TypeError, OSError): |
|
pass |
|
|
|
return exceptions |
|
|
|
def _infer_exception_description(self, exc_name: str, source: str) -> str: |
|
"""Attempt to infer a meaningful description for an exception from the source code context.""" |
|
# Look for the exception class definition |
|
class_match = re.search( |
|
rf'class\s+{exc_name}\s*\([^)]+\):\s*(?:"""|\'\'\')?(.*?)(?:"""|\'\'\')?\s*(?:pass|\n\s*\w+)', |
|
source, |
|
re.DOTALL, |
|
) |
|
if class_match and class_match.group(1): |
|
desc = class_match.group(1).strip() |
|
if desc: |
|
return desc |
|
|
|
# Look for comments near raise statements |
|
raise_contexts = re.finditer( |
|
rf"(?:#[^\n]*\n)*\s*raise\s+{exc_name}\s*\(([^)]*)\)", source |
|
) |
|
descriptions = [] |
|
for context in raise_contexts: |
|
comment_match = re.search(r"#\s*([^\n]+)", context.group(0)) |
|
if comment_match: |
|
descriptions.append(comment_match.group(1).strip()) |
|
elif context.group(1): # Use the error message if no comment |
|
descriptions.append(context.group(1).strip(" '\"")) |
|
|
|
if descriptions: |
|
# Use the most common or first description |
|
from collections import Counter |
|
|
|
return Counter(descriptions).most_common(1)[0][0] |
|
|
|
return "Found in source code" |
|
|
|
def _get_exception_hierarchy(self, exc_name: str) -> List[str]: |
|
"""Get the exception class hierarchy.""" |
|
try: |
|
exc_class = getattr( |
|
sys.modules.get(self.module) or sys.modules["builtins"], exc_name |
|
) |
|
if not ( |
|
inspect.isclass(exc_class) and issubclass(exc_class, BaseException) |
|
): |
|
return [] |
|
|
|
hierarchy = [] |
|
current = exc_class |
|
while current != object: |
|
hierarchy.append(current.__name__) |
|
current = current.__bases__[0] |
|
return hierarchy |
|
except (AttributeError, TypeError): |
|
return [] |
|
|
|
def _analyze_return_info(self) -> Dict[str, str]: |
|
"""Analyze return type and description.""" |
|
return_info = { |
|
"type": ( |
|
str(inspect.signature(self.obj).return_annotation) |
|
if inspect.signature(self.obj).return_annotation |
|
!= inspect.Signature.empty |
|
else None |
|
), |
|
"description": "", |
|
} |
|
|
|
if self.doc: |
|
# Look for :return: or Returns: section |
|
return_patterns = [r":return:\s*([^\n]+)", r"Returns:\s*([^\n]+)"] |
|
for pattern in return_patterns: |
|
match = re.search(pattern, self.doc) |
|
if match: |
|
return_info["description"] = match.group(1).strip() |
|
break |
|
|
|
return return_info |
|
|
|
def _categorize_method(self) -> Set[str]: |
|
"""Categorize method based on name and documentation.""" |
|
categories = set() |
|
|
|
# Common method categories and their indicators |
|
categorization_rules = { |
|
"create": {"create", "insert", "add", "new", "init"}, |
|
"read": {"get", "fetch", "retrieve", "find", "search", "list"}, |
|
"update": {"update", "modify", "change", "set"}, |
|
"delete": {"delete", "remove", "clear"}, |
|
"bulk": {"bulk", "batch", "many", "multiple"}, |
|
"validation": {"validate", "check", "verify", "ensure"}, |
|
"utility": {"format", "convert", "parse", "helper"}, |
|
"error_handling": {"raise", "except", "error", "handle"}, |
|
} |
|
|
|
# Check method name and summary against categories |
|
method_text = f"{self.name} {self.summary}".lower() |
|
for category, indicators in categorization_rules.items(): |
|
if any(indicator in method_text for indicator in indicators): |
|
categories.add(category) |
|
|
|
# Add error_handling category if method has documented exceptions |
|
if self.exceptions: |
|
categories.add("error_handling") |
|
|
|
return categories |
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
"""Convert to dictionary for serialization.""" |
|
return { |
|
"name": self.name, |
|
"doc": self.doc, |
|
"signature": self.signature, |
|
"module": self.module, |
|
"summary": self.summary, |
|
"parameters": self.parameters, |
|
"examples": self.examples, |
|
"exceptions": self.exceptions, |
|
"return_info": self.return_info, |
|
"categories": list(self._categorize_method()), |
|
} |
|
|
|
|
|
class MethodAnalyzer: |
|
"""Analyzes and caches package method information for AI agents.""" |
|
|
|
def __init__(self, cache_file: str = "method_cache.json"): |
|
self.cache_file = Path(cache_file) |
|
self._load_cache() |
|
self._exception_cache = {} |
|
self._analyzed_packages = set() # Track which packages we've analyzed |
|
|
|
def _load_cache(self): |
|
"""Load cached method information from file.""" |
|
if self.cache_file.exists(): |
|
try: |
|
with open(self.cache_file) as f: |
|
self._file_cache = json.load(f) |
|
except json.JSONDecodeError: |
|
self._file_cache = {} |
|
else: |
|
self._file_cache = {} |
|
|
|
def _save_cache(self): |
|
"""Save method information to cache file.""" |
|
with open(self.cache_file, "w") as f: |
|
json.dump(self._file_cache, f, indent=2) |
|
|
|
def get_package_methods(self, package_name: str) -> Dict[str, Any]: |
|
"""Get all methods and their documentation from a package.""" |
|
if not should_analyze_package(package_name): |
|
logger.warning( |
|
f"Skipping analysis of standard/utility package: {package_name}" |
|
) |
|
return {} |
|
|
|
# Only analyze each package once per session |
|
if package_name in self._analyzed_packages: |
|
logger.info(f"Using cached analysis for package: {package_name}") |
|
return self._file_cache.get(package_name, {}) |
|
|
|
self._analyzed_packages.add(package_name) |
|
cache_key = f"{package_name}" |
|
if cache_key in self._file_cache: |
|
return self._file_cache[cache_key] |
|
|
|
try: |
|
package = importlib.import_module(package_name) |
|
if package is None: |
|
logger.error(f"Failed to import package {package_name}") |
|
return {} |
|
|
|
methods = {} |
|
|
|
# Analyze both module-level functions and class methods |
|
for name, obj in inspect.getmembers(package): |
|
try: |
|
if inspect.isfunction(obj): |
|
methods[name] = MethodInfo(obj, name).to_dict() |
|
elif inspect.isclass(obj): |
|
# Add class methods with class name prefix |
|
for method_name, method_obj in inspect.getmembers( |
|
obj, |
|
predicate=lambda x: inspect.isfunction(x) |
|
or inspect.ismethod(x), |
|
): |
|
if not method_name.startswith("_"): # Skip private methods |
|
full_name = f"{name}.{method_name}" |
|
try: |
|
methods[full_name] = MethodInfo( |
|
method_obj, full_name |
|
).to_dict() |
|
except Exception as e: |
|
logger.warning( |
|
f"Error analyzing method {full_name}: {e}" |
|
) |
|
except Exception as e: |
|
logger.warning(f"Error analyzing object {name}: {e}") |
|
continue |
|
|
|
# Cache the results |
|
self._file_cache[cache_key] = methods |
|
self._save_cache() |
|
return methods |
|
|
|
except ImportError: |
|
logger.error(f"Package {package_name} not found") |
|
return {} |
|
except Exception as e: |
|
logger.error(f"Error analyzing package: {e}") |
|
return {} |
|
|
|
def quick_scan(self, package_name: str) -> List[Tuple[str, str, Set[str]]]: |
|
"""Quick scan of available methods with summaries and categories.""" |
|
methods = self.get_package_methods(package_name) |
|
return [ |
|
(name, info["summary"], set(info["categories"])) |
|
for name, info in methods.items() |
|
] |
|
|
|
def deep_analyze( |
|
self, package_name: str, method_name: str |
|
) -> Optional[Dict[str, Any]]: |
|
"""Detailed analysis of a specific method.""" |
|
methods = self.get_package_methods(package_name) |
|
return methods.get(method_name) |
|
|
|
def get_package_exceptions(self, package_name: str) -> Dict[str, Dict[str, Any]]: |
|
"""Get all exceptions defined or used in the package with their descriptions. |
|
|
|
Returns a dictionary of exception information: |
|
{ |
|
"exception_name": { |
|
"description": "Description of when/why this exception is raised", |
|
"hierarchy": ["Exception hierarchy from base to specific"], |
|
"raised_by": ["list of methods that raise this exception"], |
|
"module": "module where exception is defined", |
|
"usage_examples": ["examples of when this is raised"] |
|
} |
|
} |
|
""" |
|
if package_name in self._exception_cache: |
|
return self._exception_cache[package_name] |
|
|
|
exceptions = {} |
|
try: |
|
package = importlib.import_module(package_name) |
|
|
|
# First, find all exception classes defined in the package |
|
for name, obj in inspect.getmembers(package, inspect.isclass): |
|
if issubclass(obj, Exception) and obj.__module__.startswith( |
|
package_name |
|
): |
|
exceptions[name] = { |
|
"description": inspect.getdoc(obj) |
|
or "No description available", |
|
"hierarchy": self._get_exception_hierarchy(obj), |
|
"raised_by": [], |
|
"module": obj.__module__, |
|
"usage_examples": [], |
|
} |
|
|
|
# Then analyze all methods to find where exceptions are raised |
|
methods = self.get_package_methods(package_name) |
|
for method_name, method_info in methods.items(): |
|
for exc in method_info["exceptions"]: |
|
exc_name = exc["type"] |
|
if exc_name not in exceptions: |
|
# Add exceptions that are used but not defined in the package |
|
exceptions[exc_name] = { |
|
"description": "External exception", |
|
"hierarchy": exc["hierarchy"], |
|
"raised_by": [], |
|
"module": "unknown", |
|
"usage_examples": [], |
|
} |
|
|
|
# Add method to raised_by list |
|
exceptions[exc_name]["raised_by"].append(method_name) |
|
|
|
# Add description if we don't have one |
|
if exceptions[exc_name]["description"] == "External exception": |
|
exceptions[exc_name]["description"] = exc["description"] |
|
|
|
# Add usage example if we have context |
|
if method_info["examples"]: |
|
exceptions[exc_name]["usage_examples"].extend( |
|
method_info["examples"] |
|
) |
|
|
|
self._exception_cache[package_name] = exceptions |
|
return exceptions |
|
|
|
except ImportError: |
|
logger.error(f"Package {package_name} not found") |
|
return {} |
|
except Exception as e: |
|
logger.error(f"Error analyzing package exceptions: {e}") |
|
return {} |
|
|
|
def get_exception_summary( |
|
self, package_name: str |
|
) -> List[Tuple[str, str, List[str]]]: |
|
"""Get a focused summary of the most relevant exceptions in the package. |
|
|
|
Prioritizes: |
|
1. Custom exceptions defined by the package |
|
2. Commonly raised exceptions |
|
3. Well-documented exceptions |
|
4. Exceptions that indicate important error conditions |
|
|
|
Filters out: |
|
- Generic Python exceptions unless commonly used |
|
- Internal/private exceptions |
|
- Debug/development exceptions |
|
""" |
|
exceptions = self.get_package_exceptions(package_name) |
|
|
|
# Score exceptions by relevance |
|
scored_exceptions = [] |
|
for name, info in exceptions.items(): |
|
# Skip internal/private exceptions |
|
if name.startswith("_"): |
|
continue |
|
|
|
score = 0 |
|
# Custom package exception |
|
if info["module"].startswith(package_name): |
|
score += 3 |
|
|
|
# Well documented |
|
if ( |
|
len(info["description"]) > 20 |
|
and info["description"] != "Found in source code" |
|
): |
|
score += 2 |
|
|
|
# Frequently raised |
|
if len(info["raised_by"]) > 2: |
|
score += len(info["raised_by"]) |
|
|
|
# Has usage examples |
|
if info["usage_examples"]: |
|
score += 1 |
|
|
|
# Important error types |
|
if any( |
|
key in name.lower() |
|
for key in { |
|
"auth", |
|
"permission", |
|
"rate", |
|
"timeout", |
|
"invalid", |
|
"notfound", |
|
} |
|
): |
|
score += 2 |
|
|
|
scored_exceptions.append( |
|
(name, info["description"], info["raised_by"], score) |
|
) |
|
|
|
# Sort by score and return top exceptions |
|
scored_exceptions.sort(key=lambda x: x[3], reverse=True) |
|
return [ |
|
(name, desc, raised_by) |
|
for name, desc, raised_by, _ in scored_exceptions[:10] |
|
] |
|
|
|
def _get_exception_hierarchy(self, exc_class: Any) -> List[str]: |
|
"""Get the exception class hierarchy.""" |
|
try: |
|
if not ( |
|
inspect.isclass(exc_class) and issubclass(exc_class, BaseException) |
|
): |
|
return [] |
|
|
|
hierarchy = [] |
|
current = exc_class |
|
while current != object: |
|
hierarchy.append(current.__name__) |
|
current = current.__bases__[0] |
|
return hierarchy |
|
except (AttributeError, TypeError): |
|
return [] |
|
|
|
|
|
def should_auto_execute(command: str) -> bool: |
|
""" |
|
Determine if a command should be executed automatically based on user preferences. |
|
Currently supports compilation and git commands. |
|
""" |
|
# List of commands that are safe to auto-execute |
|
auto_executable_patterns = [ |
|
# Git commands |
|
r"^git\s+(status|log|diff|branch|checkout|pull|push|fetch|merge|rebase)", |
|
# Compilation commands |
|
r"^(gcc|g\+\+|make|cmake|mvn|gradle|pip|npm|yarn|cargo)", |
|
# Python package analysis (for AI agent use) |
|
r".*method_validator\.py\s+\w+\s+(--method|--list-all|--exceptions-only)", |
|
] |
|
|
|
return any(re.match(pattern, command) for pattern in auto_executable_patterns) |
|
|
|
|
|
def main(): |
|
"""CLI entry point with enhanced documentation and workflow support.""" |
|
parser = argparse.ArgumentParser( |
|
description="AI Agent's Method Validator - Analyze non-standard packages of interest", |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
epilog=""" |
|
IMPORTANT: Only use this tool for: |
|
- Non-standard packages directly relevant to your current task |
|
- Packages where you need to understand specific methods or exceptions |
|
- Cases where you're unsure if a function already exists |
|
|
|
DO NOT use for standard library or common utility packages. |
|
""", |
|
) |
|
parser.add_argument( |
|
"package", help="Package name to analyze (must be non-standard and relevant)" |
|
) |
|
parser.add_argument("--method", help="Method name for detailed analysis") |
|
parser.add_argument( |
|
"--list-all", action="store_true", help="Quick scan of all available methods" |
|
) |
|
parser.add_argument( |
|
"--by-category", action="store_true", help="Group methods by category" |
|
) |
|
parser.add_argument( |
|
"--show-exceptions", |
|
action="store_true", |
|
help="Show detailed exception information", |
|
) |
|
parser.add_argument( |
|
"--exceptions-only", |
|
action="store_true", |
|
help="Show only exception information for the package", |
|
) |
|
parser.add_argument( |
|
"--venv-path", |
|
help="Virtual environment path (auto-detected if not provided)", |
|
default=find_venv_path(), |
|
) |
|
parser.add_argument( |
|
"--json", |
|
action="store_true", |
|
help="Output results in JSON format for machine consumption", |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
if not should_analyze_package(args.package): |
|
if args.json: |
|
print( |
|
json.dumps( |
|
{"error": "Package should not be analyzed", "package": args.package} |
|
) |
|
) |
|
sys.exit(1) |
|
logger.warning( |
|
f"Package '{args.package}' is a standard/utility package and should not be analyzed." |
|
) |
|
logger.info( |
|
"This tool is intended for analyzing non-standard packages relevant to your current task." |
|
) |
|
sys.exit(1) |
|
|
|
if not args.venv_path: |
|
if args.json: |
|
print(json.dumps({"error": "Virtual environment not found"})) |
|
sys.exit(1) |
|
logger.error("Could not find virtual environment path") |
|
sys.exit(1) |
|
|
|
# Use the virtual environment's Python executable |
|
venv_python = Path(args.venv_path) / "bin" / "python" |
|
if not venv_python.exists(): |
|
if args.json: |
|
print( |
|
json.dumps( |
|
{"error": "Python executable not found in virtual environment"} |
|
) |
|
) |
|
sys.exit(1) |
|
logger.error(f"Could not find Python executable in {args.venv_path}") |
|
sys.exit(1) |
|
|
|
# Set PYTHONPATH to use the virtual environment's packages |
|
os.environ["PYTHONPATH"] = str( |
|
Path(args.venv_path) |
|
/ "lib" |
|
/ f"python{sys.version_info.major}.{sys.version_info.minor}" |
|
/ "site-packages" |
|
) |
|
|
|
try: |
|
analyzer = MethodAnalyzer() |
|
result = {} |
|
|
|
if args.exceptions_only: |
|
exceptions = analyzer.get_exception_summary(args.package) |
|
if args.json: |
|
result["exceptions"] = [ |
|
{"name": name, "description": description, "raised_by": raised_by} |
|
for name, description, raised_by in exceptions |
|
] |
|
else: |
|
if not exceptions: |
|
logger.warning("No exceptions found in package documentation") |
|
return |
|
|
|
for exc_name, description, raised_by in exceptions: |
|
logger.info(f"\n{exc_name}:") |
|
logger.info(f" Description: {description}") |
|
if raised_by: |
|
logger.info(f" Raised by {len(raised_by)} methods:") |
|
for method in raised_by[:5]: |
|
logger.info(f" - {method}") |
|
if len(raised_by) > 5: |
|
logger.info(f" ... and {len(raised_by) - 5} more") |
|
|
|
elif args.list_all: |
|
methods = analyzer.quick_scan(args.package) |
|
if args.json: |
|
result["methods"] = [] |
|
if args.by_category: |
|
by_category = {} |
|
for name, summary, categories in methods: |
|
for category in categories: |
|
by_category.setdefault(category, []).append( |
|
{"name": name, "summary": summary} |
|
) |
|
result["methods_by_category"] = by_category |
|
else: |
|
result["methods"] = [ |
|
{"name": name, "summary": summary} |
|
for name, summary, _ in methods |
|
] |
|
else: |
|
if args.by_category: |
|
by_category = {} |
|
for name, summary, categories in methods: |
|
for category in categories: |
|
by_category.setdefault(category, []).append((name, summary)) |
|
|
|
for category, methods in by_category.items(): |
|
logger.info(f"\n[{category.upper()}]") |
|
for name, summary in methods: |
|
logger.info(f"\n{name}:") |
|
if summary: |
|
logger.info(f" Summary: {summary}") |
|
else: |
|
for name, summary, _ in methods: |
|
logger.info(f"\n{name}:") |
|
if summary: |
|
logger.info(f" Summary: {summary}") |
|
|
|
elif args.method: |
|
method_info = analyzer.deep_analyze(args.package, args.method) |
|
if method_info: |
|
if args.json: |
|
result["method_info"] = method_info |
|
else: |
|
logger.info(f"\nDetailed analysis of '{args.method}':") |
|
logger.info(f"Signature: {method_info['signature']}") |
|
logger.info(f"Categories: {', '.join(method_info['categories'])}") |
|
|
|
if method_info["doc"]: |
|
logger.info(f"\nDocumentation:\n{method_info['doc']}") |
|
|
|
logger.info("\nParameters:") |
|
for name, details in method_info["parameters"].items(): |
|
required = "required" if details["required"] else "optional" |
|
logger.info(f" {name} ({required}):") |
|
if details["type"]: |
|
logger.info(f" Type: {details['type']}") |
|
if details["description"]: |
|
logger.info(f" Description: {details['description']}") |
|
if details["default"]: |
|
logger.info(f" Default: {details['default']}") |
|
|
|
if args.show_exceptions and method_info["exceptions"]: |
|
logger.info("\nExceptions:") |
|
for exc in method_info["exceptions"]: |
|
logger.info(f" {exc['type']}:") |
|
logger.info(f" Description: {exc['description']}") |
|
if exc["hierarchy"]: |
|
logger.info( |
|
f" Hierarchy: {' -> '.join(exc['hierarchy'])}" |
|
) |
|
|
|
if method_info["return_info"]["type"]: |
|
logger.info(f"\nReturns: {method_info['return_info']['type']}") |
|
if method_info["return_info"]["description"]: |
|
logger.info( |
|
f" {method_info['return_info']['description']}" |
|
) |
|
|
|
if method_info["examples"]: |
|
logger.info("\nExamples:") |
|
for example in method_info["examples"]: |
|
logger.info(f"\n{example}") |
|
else: |
|
similar = [ |
|
name |
|
for name, _, _ in analyzer.quick_scan(args.package) |
|
if args.method.lower() in name.lower() |
|
] |
|
if args.json: |
|
result["error"] = f"Method '{args.method}' not found" |
|
if similar: |
|
result["similar_methods"] = similar |
|
else: |
|
logger.warning(f"\nMethod '{args.method}' not found.") |
|
if similar: |
|
logger.info("Similar methods found:") |
|
for method in similar: |
|
logger.info(f" - {method}") |
|
|
|
else: |
|
if args.json: |
|
result["error"] = "No action specified" |
|
else: |
|
parser.print_help() |
|
|
|
if args.json: |
|
print(json.dumps(result)) |
|
|
|
except Exception as e: |
|
if args.json: |
|
print(json.dumps({"error": str(e)})) |
|
else: |
|
logger.error(f"Error during analysis: {e}") |
|
sys.exit(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |