Created
February 8, 2026 21:34
-
-
Save weshoke/5a0f6f632dd80d34b3c0e5dc867c0d9c to your computer and use it in GitHub Desktop.
dspy.RLM analyzing a code base with a rules file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Codebase analyzer using Recursive Language Models (RLM) via DSPy. | |
| Based on: https://kmad.ai/Recursive-Language-Models-Security-Audit | |
| Usage: | |
| python analyze-codebase.py --mode security --output report.md | |
| python analyze-codebase.py --mode documentation --exclude tests,vendor | |
| python analyze-codebase.py --mode quality --max-iterations 50 | |
| """ | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| import click | |
| import dspy | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # ============================================================================ | |
| # Configuration & API Key Management | |
| # ============================================================================ | |
| def _get_api_key(model: str) -> str: | |
| """Get API key based on model provider.""" | |
| if model.startswith("anthropic"): | |
| api_key = os.getenv("ANTHROPIC_API_KEY") | |
| if not api_key: | |
| raise ValueError("ANTHROPIC_API_KEY not found in environment") | |
| return api_key | |
| elif model.startswith("openai"): | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENAI_API_KEY not found in environment") | |
| return api_key | |
| elif model.startswith("gemini"): | |
| api_key = os.getenv("GEMINI_API_KEY") | |
| if not api_key: | |
| raise ValueError("GEMINI_API_KEY not found in environment") | |
| return api_key | |
| elif model.startswith("openrouter"): | |
| api_key = os.getenv("OPENROUTER_API_KEY") | |
| if not api_key: | |
| raise ValueError("OPENROUTER_API_KEY not found in environment") | |
| return api_key | |
| else: | |
| raise ValueError(f"Unknown model provider: {model}") | |
| def _config_dspy_lm(model: str, max_tokens: int) -> dspy.LM: | |
| """Configure DSPy language model.""" | |
| api_key = _get_api_key(model) | |
| return dspy.LM( | |
| model=model, | |
| api_key=api_key, | |
| max_tokens=max_tokens, | |
| ) | |
| # ============================================================================ | |
| # Source Tree Loading | |
| # ============================================================================ | |
| def load_project_rules(rules_path: str | Path | None = None) -> str | None: | |
| """ | |
| Load project-specific implementation rules. | |
| Args: | |
| rules_path: Path to rules file (e.g., 'dev/IMPLEMENTATION_RULES.md') | |
| Returns: | |
| Rules content as string, or None if not found | |
| """ | |
| if rules_path is None: | |
| # Try common locations | |
| possible_paths = [ | |
| Path("dev/IMPLEMENTATION_RULES.md"), | |
| Path("IMPLEMENTATION_RULES.md"), | |
| Path("docs/IMPLEMENTATION_RULES.md"), | |
| Path(".github/IMPLEMENTATION_RULES.md"), | |
| ] | |
| for path in possible_paths: | |
| if path.exists(): | |
| rules_path = path | |
| break | |
| else: | |
| return None | |
| rules_path = Path(rules_path) | |
| if not rules_path.exists(): | |
| return None | |
| try: | |
| return rules_path.read_text(encoding="utf-8") | |
| except Exception as e: | |
| click.echo(f"Warning: Could not read rules file: {e}", err=True) | |
| return None | |
| def load_source_tree( | |
| root_dir: str | Path, | |
| exclude_dirs: set[str] | None = None, | |
| exclude_extensions: set[str] | None = None, | |
| max_file_size: int = 1_000_000, # 1MB default | |
| ) -> dict[str, Any]: | |
| """ | |
| Recursively load folder structure into a nested dict. | |
| Args: | |
| root_dir: Root directory to scan | |
| exclude_dirs: Set of directory names to skip (e.g., {'node_modules', '.git'}) | |
| exclude_extensions: Set of file extensions to skip (e.g., {'.pyc', '.so'}) | |
| max_file_size: Maximum file size in bytes to include | |
| Returns: | |
| Nested dictionary: folders -> files -> content | |
| """ | |
| if exclude_dirs is None: | |
| exclude_dirs = { | |
| ".git", | |
| ".hg", | |
| ".svn", | |
| "node_modules", | |
| "__pycache__", | |
| ".pytest_cache", | |
| "build", | |
| "dist", | |
| ".venv", | |
| "venv", | |
| ".cache", | |
| ".tox", | |
| ".mypy_cache", | |
| ".DS_Store", | |
| } | |
| if exclude_extensions is None: | |
| exclude_extensions = { | |
| ".pyc", | |
| ".pyo", | |
| ".so", | |
| ".dylib", | |
| ".dll", | |
| ".o", | |
| ".obj", | |
| ".a", | |
| ".lib", | |
| ".jpg", | |
| ".jpeg", | |
| ".png", | |
| ".gif", | |
| ".ico", | |
| ".mp4", | |
| ".mov", | |
| ".avi", | |
| ".zip", | |
| ".tar", | |
| ".gz", | |
| ".bz2", | |
| ".pdf", | |
| ".doc", | |
| ".docx", | |
| } | |
| root_dir = Path(root_dir) | |
| tree: dict[str, Any] = {} | |
| try: | |
| for entry in sorted(os.listdir(root_dir)): | |
| if entry.startswith(".") and entry not in {".env", ".gitignore"}: | |
| continue | |
| path = root_dir / entry | |
| if path.is_dir(): | |
| if entry in exclude_dirs: | |
| continue | |
| tree[entry] = load_source_tree( | |
| path, exclude_dirs, exclude_extensions, max_file_size | |
| ) | |
| else: | |
| # Check file extension | |
| if path.suffix in exclude_extensions: | |
| continue | |
| # Check file size | |
| try: | |
| if path.stat().st_size > max_file_size: | |
| tree[entry] = f"[File too large: {path.stat().st_size} bytes]" | |
| continue | |
| except OSError: | |
| continue | |
| # Read file content | |
| try: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| tree[entry] = f.read() | |
| except Exception as e: | |
| tree[entry] = f"[Error reading file: {e}]" | |
| except PermissionError: | |
| return {"[Permission Denied]": str(root_dir)} | |
| return tree | |
| # ============================================================================ | |
| # DSPy Signatures for Different Analysis Modes | |
| # ============================================================================ | |
| class SecurityAudit(dspy.Signature): | |
| """ | |
| Review the provided application source code in detail. | |
| Focus specifically on identifying security vulnerabilities, | |
| insecure coding patterns, and other areas of concern. | |
| Check for: | |
| - Injection vulnerabilities (SQL, command, code) | |
| - Authentication and authorization issues | |
| - Sensitive data exposure | |
| - Insecure configurations | |
| - Broken access control | |
| - Vulnerable dependencies | |
| - Logic vulnerabilities | |
| If project_rules are provided, use them as additional context for the review. | |
| """ | |
| source_tree: dict[str, Any] = dspy.InputField() | |
| project_rules: str = dspy.InputField(default="") | |
| analysis: str = dspy.OutputField( | |
| description="Detailed security audit report in markdown format." | |
| ) | |
| class CodeDocumentation(dspy.Signature): | |
| """ | |
| Analyze the codebase and generate comprehensive technical documentation. | |
| Include: | |
| - Project structure and architecture | |
| - Key components and their relationships | |
| - API endpoints and interfaces | |
| - Data models and schemas | |
| - Build and deployment information | |
| If project_rules are provided, reference them in the documentation to explain | |
| design decisions and implementation patterns. | |
| """ | |
| source_tree: dict[str, Any] = dspy.InputField() | |
| project_rules: str = dspy.InputField(default="") | |
| documentation: str = dspy.OutputField(description="Generated markdown documentation.") | |
| class CodeQuality(dspy.Signature): | |
| """ | |
| Analyze code quality, maintainability, and best practices. | |
| Focus on: | |
| - Code organization and structure | |
| - Naming conventions and clarity | |
| - Code duplication and complexity | |
| - Error handling patterns | |
| - Testing coverage and quality | |
| - Performance considerations | |
| - Technical debt and refactoring opportunities | |
| If project_rules are provided, evaluate compliance with project-specific | |
| standards and conventions defined in the rules. | |
| """ | |
| source_tree: dict[str, Any] = dspy.InputField() | |
| project_rules: str = dspy.InputField(default="") | |
| analysis: str = dspy.OutputField(description="Code quality analysis report in markdown format.") | |
| class ArchitectureReview(dspy.Signature): | |
| """ | |
| Analyze the software architecture and design patterns. | |
| Examine: | |
| - System architecture and component design | |
| - Design patterns and their application | |
| - Separation of concerns | |
| - Scalability and extensibility | |
| - Dependencies and coupling | |
| - Architectural trade-offs and recommendations | |
| If project_rules are provided, assess how well the architecture aligns | |
| with the documented principles and patterns. | |
| """ | |
| source_tree: dict[str, Any] = dspy.InputField() | |
| project_rules: str = dspy.InputField(default="") | |
| analysis: str = dspy.OutputField(description="Architecture review report in markdown format.") | |
| # ============================================================================ | |
| # Main Analysis Function | |
| # ============================================================================ | |
| def analyze_codebase( | |
| root_dir: str | Path, | |
| mode: str = "security", | |
| max_iterations: int = 35, | |
| output_file: str | None = None, | |
| exclude_dirs: list[str] | None = None, | |
| rules_file: str | Path | None = None, | |
| verbose: bool = True, | |
| ) -> str: | |
| """ | |
| Analyze a codebase using RLM. | |
| Args: | |
| root_dir: Root directory of the codebase | |
| mode: Analysis mode ('security', 'documentation', 'quality', 'architecture') | |
| max_iterations: Maximum RLM iterations | |
| output_file: Optional output file path | |
| exclude_dirs: Additional directories to exclude | |
| rules_file: Path to project implementation rules file | |
| verbose: Show RLM reasoning steps | |
| Returns: | |
| Analysis report as string | |
| """ | |
| # Initialize DSPy | |
| model = os.getenv("DSPY_MODEL", "anthropic/claude-3-5-sonnet-20241022") | |
| sub_model = os.getenv("DSPY_SUB_MODEL", model) # Can use cheaper model for subtasks | |
| max_tokens = int(os.getenv("DSPY_MAX_TOKENS", "16000")) | |
| print(f"Initializing DSPy with model: {model}") | |
| lm = _config_dspy_lm(model, max_tokens) | |
| lm_sub = _config_dspy_lm(sub_model, max_tokens) | |
| dspy.configure(lm=lm) | |
| # Load source tree | |
| print(f"Loading codebase from: {root_dir}") | |
| exclude_set = { | |
| ".git", | |
| "node_modules", | |
| "__pycache__", | |
| "build", | |
| "dist", | |
| ".venv", | |
| "venv", | |
| ".cache", | |
| "vendor", | |
| } | |
| if exclude_dirs: | |
| exclude_set.update(exclude_dirs) | |
| source_tree = load_source_tree(root_dir, exclude_dirs=exclude_set) | |
| # Load project rules if available | |
| project_rules = "" | |
| if rules_file: | |
| click.echo(f"Loading project rules from: {rules_file}") | |
| loaded_rules = load_project_rules(rules_file) | |
| if loaded_rules: | |
| project_rules = loaded_rules | |
| click.secho(f"✓ Loaded {len(project_rules)} characters of rules", fg='green') | |
| else: | |
| click.secho(f"⚠ Could not load rules from {rules_file}", fg='yellow') | |
| else: | |
| # Try to auto-detect rules | |
| loaded_rules = load_project_rules() | |
| if loaded_rules: | |
| project_rules = loaded_rules | |
| click.secho(f"✓ Auto-detected project rules ({len(project_rules)} chars)", fg='green') | |
| # Select signature based on mode | |
| signatures = { | |
| "security": SecurityAudit, | |
| "documentation": CodeDocumentation, | |
| "quality": CodeQuality, | |
| "architecture": ArchitectureReview, | |
| } | |
| if mode not in signatures: | |
| raise ValueError(f"Unknown mode: {mode}. Choose from: {list(signatures.keys())}") | |
| print(f"Running {mode} analysis with RLM (max {max_iterations} iterations)...") | |
| # Create RLM module | |
| analyzer = dspy.RLM( | |
| signatures[mode], | |
| max_iterations=max_iterations, | |
| sub_lm=lm_sub, | |
| verbose=verbose, | |
| ) | |
| # Run analysis | |
| result = analyzer(source_tree=source_tree, project_rules=project_rules) | |
| # Extract result (field name varies by signature) | |
| output_field = "analysis" if mode != "documentation" else "documentation" | |
| report = getattr(result, output_field, str(result)) | |
| # Save to file if requested | |
| if output_file: | |
| output_path = Path(output_file) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| output_path.write_text(report, encoding="utf-8") | |
| print(f"\nReport saved to: {output_file}") | |
| return report | |
| # ============================================================================ | |
| # CLI Interface | |
| # ============================================================================ | |
| @click.command() | |
| @click.option( | |
| "--root", | |
| type=click.Path(exists=True, file_okay=False, dir_okay=True), | |
| default=".", | |
| help="Root directory of codebase.", | |
| show_default=True, | |
| ) | |
| @click.option( | |
| "--mode", | |
| type=click.Choice( | |
| ["security", "documentation", "quality", "architecture"], case_sensitive=False | |
| ), | |
| default="security", | |
| help="Analysis mode.", | |
| show_default=True, | |
| ) | |
| @click.option( | |
| "--max-iterations", | |
| type=int, | |
| default=35, | |
| help="Maximum RLM iterations.", | |
| show_default=True, | |
| ) | |
| @click.option( | |
| "--output", | |
| "-o", | |
| type=click.Path(dir_okay=False), | |
| help="Output file path (if not specified, prints to stdout).", | |
| ) | |
| @click.option( | |
| "--exclude", | |
| type=str, | |
| help="Comma-separated list of directories to exclude.", | |
| ) | |
| @click.option( | |
| "--rules", | |
| "-r", | |
| type=click.Path(exists=True, dir_okay=False), | |
| help="Path to project implementation rules file (auto-detects dev/IMPLEMENTATION_RULES.md if not specified).", | |
| ) | |
| @click.option( | |
| "--quiet", | |
| "-q", | |
| is_flag=True, | |
| help="Hide RLM reasoning steps.", | |
| ) | |
| @click.version_option(version="1.0.0", prog_name="analyze-codebase") | |
| def main(root, mode, max_iterations, output, exclude, rules, quiet): | |
| """ | |
| Analyze codebase using Recursive Language Models (RLM). | |
| Examples: | |
| \b | |
| # Security audit of current directory | |
| python analyze-codebase.py --mode security | |
| \b | |
| # Generate documentation with custom output | |
| python analyze-codebase.py --mode documentation --output docs/architecture.md | |
| \b | |
| # Code quality analysis with project rules | |
| python analyze-codebase.py --mode quality --rules dev/IMPLEMENTATION_RULES.md | |
| \b | |
| # Architecture review excluding vendor directory | |
| python analyze-codebase.py --mode architecture --exclude vendor,tests | |
| \b | |
| Project Rules: | |
| The analyzer can use project-specific implementation rules to provide | |
| context-aware analysis. Specify with --rules or it will auto-detect | |
| common locations like dev/IMPLEMENTATION_RULES.md. | |
| \b | |
| Environment Variables: | |
| DSPY_MODEL Main model (default: anthropic/claude-3-5-sonnet-20241022) | |
| DSPY_SUB_MODEL Sub-task model (default: same as DSPY_MODEL) | |
| DSPY_MAX_TOKENS Max tokens per request (default: 16000) | |
| ANTHROPIC_API_KEY API key for Anthropic models | |
| OPENAI_API_KEY API key for OpenAI models | |
| OPENROUTER_API_KEY API key for OpenRouter models | |
| """ | |
| # Parse exclude dirs | |
| exclude_dirs = exclude.split(",") if exclude else None | |
| try: | |
| with click.progressbar( | |
| length=1, | |
| label=f"Analyzing codebase ({mode} mode)", | |
| show_eta=False, | |
| ) as bar: | |
| report = analyze_codebase( | |
| root_dir=root, | |
| mode=mode.lower(), | |
| max_iterations=max_iterations, | |
| output_file=output, | |
| exclude_dirs=exclude_dirs, | |
| rules_file=rules, | |
| verbose=not quiet, | |
| ) | |
| bar.update(1) | |
| if not output: | |
| click.echo("\n" + "=" * 80) | |
| click.secho("ANALYSIS REPORT", fg="green", bold=True) | |
| click.echo("=" * 80) | |
| click.echo(report) | |
| else: | |
| click.secho(f"\n✓ Report saved to: {output}", fg="green") | |
| except Exception as e: | |
| click.secho(f"Error: {e}", fg="red", err=True) | |
| raise | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment