Skip to content

Instantly share code, notes, and snippets.

@weshoke
Created February 8, 2026 21:34
Show Gist options
  • Select an option

  • Save weshoke/5a0f6f632dd80d34b3c0e5dc867c0d9c to your computer and use it in GitHub Desktop.

Select an option

Save weshoke/5a0f6f632dd80d34b3c0e5dc867c0d9c to your computer and use it in GitHub Desktop.
dspy.RLM analyzing a code base with a rules file
#!/usr/bin/env python3
"""
Codebase analyzer using Recursive Language Models (RLM) via DSPy.
Based on: https://kmad.ai/Recursive-Language-Models-Security-Audit
Usage:
python analyze-codebase.py --mode security --output report.md
python analyze-codebase.py --mode documentation --exclude tests,vendor
python analyze-codebase.py --mode quality --max-iterations 50
"""
import os
from pathlib import Path
from typing import Any
import click
import dspy
from dotenv import load_dotenv
load_dotenv()
# ============================================================================
# Configuration & API Key Management
# ============================================================================
def _get_api_key(model: str) -> str:
"""Get API key based on model provider."""
if model.startswith("anthropic"):
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY not found in environment")
return api_key
elif model.startswith("openai"):
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY not found in environment")
return api_key
elif model.startswith("gemini"):
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY not found in environment")
return api_key
elif model.startswith("openrouter"):
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError("OPENROUTER_API_KEY not found in environment")
return api_key
else:
raise ValueError(f"Unknown model provider: {model}")
def _config_dspy_lm(model: str, max_tokens: int) -> dspy.LM:
"""Configure DSPy language model."""
api_key = _get_api_key(model)
return dspy.LM(
model=model,
api_key=api_key,
max_tokens=max_tokens,
)
# ============================================================================
# Source Tree Loading
# ============================================================================
def load_project_rules(rules_path: str | Path | None = None) -> str | None:
"""
Load project-specific implementation rules.
Args:
rules_path: Path to rules file (e.g., 'dev/IMPLEMENTATION_RULES.md')
Returns:
Rules content as string, or None if not found
"""
if rules_path is None:
# Try common locations
possible_paths = [
Path("dev/IMPLEMENTATION_RULES.md"),
Path("IMPLEMENTATION_RULES.md"),
Path("docs/IMPLEMENTATION_RULES.md"),
Path(".github/IMPLEMENTATION_RULES.md"),
]
for path in possible_paths:
if path.exists():
rules_path = path
break
else:
return None
rules_path = Path(rules_path)
if not rules_path.exists():
return None
try:
return rules_path.read_text(encoding="utf-8")
except Exception as e:
click.echo(f"Warning: Could not read rules file: {e}", err=True)
return None
def load_source_tree(
root_dir: str | Path,
exclude_dirs: set[str] | None = None,
exclude_extensions: set[str] | None = None,
max_file_size: int = 1_000_000, # 1MB default
) -> dict[str, Any]:
"""
Recursively load folder structure into a nested dict.
Args:
root_dir: Root directory to scan
exclude_dirs: Set of directory names to skip (e.g., {'node_modules', '.git'})
exclude_extensions: Set of file extensions to skip (e.g., {'.pyc', '.so'})
max_file_size: Maximum file size in bytes to include
Returns:
Nested dictionary: folders -> files -> content
"""
if exclude_dirs is None:
exclude_dirs = {
".git",
".hg",
".svn",
"node_modules",
"__pycache__",
".pytest_cache",
"build",
"dist",
".venv",
"venv",
".cache",
".tox",
".mypy_cache",
".DS_Store",
}
if exclude_extensions is None:
exclude_extensions = {
".pyc",
".pyo",
".so",
".dylib",
".dll",
".o",
".obj",
".a",
".lib",
".jpg",
".jpeg",
".png",
".gif",
".ico",
".mp4",
".mov",
".avi",
".zip",
".tar",
".gz",
".bz2",
".pdf",
".doc",
".docx",
}
root_dir = Path(root_dir)
tree: dict[str, Any] = {}
try:
for entry in sorted(os.listdir(root_dir)):
if entry.startswith(".") and entry not in {".env", ".gitignore"}:
continue
path = root_dir / entry
if path.is_dir():
if entry in exclude_dirs:
continue
tree[entry] = load_source_tree(
path, exclude_dirs, exclude_extensions, max_file_size
)
else:
# Check file extension
if path.suffix in exclude_extensions:
continue
# Check file size
try:
if path.stat().st_size > max_file_size:
tree[entry] = f"[File too large: {path.stat().st_size} bytes]"
continue
except OSError:
continue
# Read file content
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
tree[entry] = f.read()
except Exception as e:
tree[entry] = f"[Error reading file: {e}]"
except PermissionError:
return {"[Permission Denied]": str(root_dir)}
return tree
# ============================================================================
# DSPy Signatures for Different Analysis Modes
# ============================================================================
class SecurityAudit(dspy.Signature):
"""
Review the provided application source code in detail.
Focus specifically on identifying security vulnerabilities,
insecure coding patterns, and other areas of concern.
Check for:
- Injection vulnerabilities (SQL, command, code)
- Authentication and authorization issues
- Sensitive data exposure
- Insecure configurations
- Broken access control
- Vulnerable dependencies
- Logic vulnerabilities
If project_rules are provided, use them as additional context for the review.
"""
source_tree: dict[str, Any] = dspy.InputField()
project_rules: str = dspy.InputField(default="")
analysis: str = dspy.OutputField(
description="Detailed security audit report in markdown format."
)
class CodeDocumentation(dspy.Signature):
"""
Analyze the codebase and generate comprehensive technical documentation.
Include:
- Project structure and architecture
- Key components and their relationships
- API endpoints and interfaces
- Data models and schemas
- Build and deployment information
If project_rules are provided, reference them in the documentation to explain
design decisions and implementation patterns.
"""
source_tree: dict[str, Any] = dspy.InputField()
project_rules: str = dspy.InputField(default="")
documentation: str = dspy.OutputField(description="Generated markdown documentation.")
class CodeQuality(dspy.Signature):
"""
Analyze code quality, maintainability, and best practices.
Focus on:
- Code organization and structure
- Naming conventions and clarity
- Code duplication and complexity
- Error handling patterns
- Testing coverage and quality
- Performance considerations
- Technical debt and refactoring opportunities
If project_rules are provided, evaluate compliance with project-specific
standards and conventions defined in the rules.
"""
source_tree: dict[str, Any] = dspy.InputField()
project_rules: str = dspy.InputField(default="")
analysis: str = dspy.OutputField(description="Code quality analysis report in markdown format.")
class ArchitectureReview(dspy.Signature):
"""
Analyze the software architecture and design patterns.
Examine:
- System architecture and component design
- Design patterns and their application
- Separation of concerns
- Scalability and extensibility
- Dependencies and coupling
- Architectural trade-offs and recommendations
If project_rules are provided, assess how well the architecture aligns
with the documented principles and patterns.
"""
source_tree: dict[str, Any] = dspy.InputField()
project_rules: str = dspy.InputField(default="")
analysis: str = dspy.OutputField(description="Architecture review report in markdown format.")
# ============================================================================
# Main Analysis Function
# ============================================================================
def analyze_codebase(
root_dir: str | Path,
mode: str = "security",
max_iterations: int = 35,
output_file: str | None = None,
exclude_dirs: list[str] | None = None,
rules_file: str | Path | None = None,
verbose: bool = True,
) -> str:
"""
Analyze a codebase using RLM.
Args:
root_dir: Root directory of the codebase
mode: Analysis mode ('security', 'documentation', 'quality', 'architecture')
max_iterations: Maximum RLM iterations
output_file: Optional output file path
exclude_dirs: Additional directories to exclude
rules_file: Path to project implementation rules file
verbose: Show RLM reasoning steps
Returns:
Analysis report as string
"""
# Initialize DSPy
model = os.getenv("DSPY_MODEL", "anthropic/claude-3-5-sonnet-20241022")
sub_model = os.getenv("DSPY_SUB_MODEL", model) # Can use cheaper model for subtasks
max_tokens = int(os.getenv("DSPY_MAX_TOKENS", "16000"))
print(f"Initializing DSPy with model: {model}")
lm = _config_dspy_lm(model, max_tokens)
lm_sub = _config_dspy_lm(sub_model, max_tokens)
dspy.configure(lm=lm)
# Load source tree
print(f"Loading codebase from: {root_dir}")
exclude_set = {
".git",
"node_modules",
"__pycache__",
"build",
"dist",
".venv",
"venv",
".cache",
"vendor",
}
if exclude_dirs:
exclude_set.update(exclude_dirs)
source_tree = load_source_tree(root_dir, exclude_dirs=exclude_set)
# Load project rules if available
project_rules = ""
if rules_file:
click.echo(f"Loading project rules from: {rules_file}")
loaded_rules = load_project_rules(rules_file)
if loaded_rules:
project_rules = loaded_rules
click.secho(f"✓ Loaded {len(project_rules)} characters of rules", fg='green')
else:
click.secho(f"⚠ Could not load rules from {rules_file}", fg='yellow')
else:
# Try to auto-detect rules
loaded_rules = load_project_rules()
if loaded_rules:
project_rules = loaded_rules
click.secho(f"✓ Auto-detected project rules ({len(project_rules)} chars)", fg='green')
# Select signature based on mode
signatures = {
"security": SecurityAudit,
"documentation": CodeDocumentation,
"quality": CodeQuality,
"architecture": ArchitectureReview,
}
if mode not in signatures:
raise ValueError(f"Unknown mode: {mode}. Choose from: {list(signatures.keys())}")
print(f"Running {mode} analysis with RLM (max {max_iterations} iterations)...")
# Create RLM module
analyzer = dspy.RLM(
signatures[mode],
max_iterations=max_iterations,
sub_lm=lm_sub,
verbose=verbose,
)
# Run analysis
result = analyzer(source_tree=source_tree, project_rules=project_rules)
# Extract result (field name varies by signature)
output_field = "analysis" if mode != "documentation" else "documentation"
report = getattr(result, output_field, str(result))
# Save to file if requested
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(report, encoding="utf-8")
print(f"\nReport saved to: {output_file}")
return report
# ============================================================================
# CLI Interface
# ============================================================================
@click.command()
@click.option(
"--root",
type=click.Path(exists=True, file_okay=False, dir_okay=True),
default=".",
help="Root directory of codebase.",
show_default=True,
)
@click.option(
"--mode",
type=click.Choice(
["security", "documentation", "quality", "architecture"], case_sensitive=False
),
default="security",
help="Analysis mode.",
show_default=True,
)
@click.option(
"--max-iterations",
type=int,
default=35,
help="Maximum RLM iterations.",
show_default=True,
)
@click.option(
"--output",
"-o",
type=click.Path(dir_okay=False),
help="Output file path (if not specified, prints to stdout).",
)
@click.option(
"--exclude",
type=str,
help="Comma-separated list of directories to exclude.",
)
@click.option(
"--rules",
"-r",
type=click.Path(exists=True, dir_okay=False),
help="Path to project implementation rules file (auto-detects dev/IMPLEMENTATION_RULES.md if not specified).",
)
@click.option(
"--quiet",
"-q",
is_flag=True,
help="Hide RLM reasoning steps.",
)
@click.version_option(version="1.0.0", prog_name="analyze-codebase")
def main(root, mode, max_iterations, output, exclude, rules, quiet):
"""
Analyze codebase using Recursive Language Models (RLM).
Examples:
\b
# Security audit of current directory
python analyze-codebase.py --mode security
\b
# Generate documentation with custom output
python analyze-codebase.py --mode documentation --output docs/architecture.md
\b
# Code quality analysis with project rules
python analyze-codebase.py --mode quality --rules dev/IMPLEMENTATION_RULES.md
\b
# Architecture review excluding vendor directory
python analyze-codebase.py --mode architecture --exclude vendor,tests
\b
Project Rules:
The analyzer can use project-specific implementation rules to provide
context-aware analysis. Specify with --rules or it will auto-detect
common locations like dev/IMPLEMENTATION_RULES.md.
\b
Environment Variables:
DSPY_MODEL Main model (default: anthropic/claude-3-5-sonnet-20241022)
DSPY_SUB_MODEL Sub-task model (default: same as DSPY_MODEL)
DSPY_MAX_TOKENS Max tokens per request (default: 16000)
ANTHROPIC_API_KEY API key for Anthropic models
OPENAI_API_KEY API key for OpenAI models
OPENROUTER_API_KEY API key for OpenRouter models
"""
# Parse exclude dirs
exclude_dirs = exclude.split(",") if exclude else None
try:
with click.progressbar(
length=1,
label=f"Analyzing codebase ({mode} mode)",
show_eta=False,
) as bar:
report = analyze_codebase(
root_dir=root,
mode=mode.lower(),
max_iterations=max_iterations,
output_file=output,
exclude_dirs=exclude_dirs,
rules_file=rules,
verbose=not quiet,
)
bar.update(1)
if not output:
click.echo("\n" + "=" * 80)
click.secho("ANALYSIS REPORT", fg="green", bold=True)
click.echo("=" * 80)
click.echo(report)
else:
click.secho(f"\n✓ Report saved to: {output}", fg="green")
except Exception as e:
click.secho(f"Error: {e}", fg="red", err=True)
raise
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment