Created
February 6, 2026 22:56
-
-
Save secemp9/13e942f3ca915b4ee9a8f4b00ac35e1b to your computer and use it in GitHub Desktop.
Deep AST-based static analysis for the Dockercraft package (50-phase checker)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Deep AST-based static analysis of the dockercraft package. | |
| Designed to catch artifacts from parallel agent edits: | |
| - Duplicate/conflicting definitions across files | |
| - Broken import chains after renames | |
| - Orphaned references to moved/deleted symbols | |
| - Inconsistent class hierarchies | |
| - self.attr usage for attrs never set in __init__ | |
| - Methods missing self parameter | |
| - Stale exception types | |
| - Dead code (defined but unreferenced) | |
| - Shadowed imports | |
| - __all__ drift | |
| - Signature mismatches between definition and usage | |
| - And more. | |
| """ | |
| import ast | |
| import os | |
| import sys | |
| import re | |
| import textwrap | |
| from pathlib import Path | |
| from collections import defaultdict, Counter | |
| from typing import Dict, Set, List, Tuple, Optional, Any | |
| PKG_ROOT = Path("/home/secemp9/dockerpy_final/dockerfile-py/dockercraft") | |
| STDLIB_MODULES = set(sys.stdlib_module_names) if hasattr(sys, 'stdlib_module_names') else { | |
| 'abc', 'ast', 'atexit', 'base64', 'builtins', 'collections', 'concurrent', | |
| 'configparser', 'contextlib', 'copy', 'csv', 'dataclasses', 'datetime', | |
| 'decimal', 'difflib', 'email', 'enum', 'fcntl', 'fnmatch', 'fractions', | |
| 'functools', 'gc', 'getpass', 'glob', 'gzip', 'hashlib', 'heapq', 'hmac', | |
| 'html', 'http', 'importlib', 'inspect', 'io', 'itertools', 'json', | |
| 'logging', 'lzma', 'math', 'mmap', 'multiprocessing', 'numbers', 'operator', | |
| 'os', 'pathlib', 'pdb', 'pickle', 'platform', 'pprint', 'queue', | |
| 're', 'secrets', 'select', 'shelve', 'shlex', 'shutil', 'signal', | |
| 'socket', 'sqlite3', 'ssl', 'stat', 'string', 'struct', 'subprocess', | |
| 'sys', 'syslog', 'tarfile', 'tempfile', 'textwrap', 'threading', 'time', | |
| 'token', 'tokenize', 'traceback', 'types', 'typing', 'typing_extensions', | |
| 'unicodedata', 'unittest', 'urllib', 'uuid', 'venv', 'warnings', | |
| 'weakref', 'xml', 'zipfile', 'zipimport', 'zlib', | |
| } | |
| KNOWN_THIRD_PARTY = {'docker', 'pytest', 'requests', 'yaml', 'toml'} | |
| PYTHON_BUILTINS = set(dir(__builtins__)) if isinstance(__builtins__, dict) else set(dir(__builtins__)) | |
| # Docker-dependent modules that can't be runtime-imported without the SDK | |
| DOCKER_DEPENDENT = { | |
| 'docker_client', 'docker_cli', 'container_manager', 'context_managers', | |
| 'buildkit_client', 'buildkit_cache', 'buildkit_secrets', | |
| 'multiplatform', 'layer_analyzer', 'monitor', 'events', 'export', | |
| 'testing', 'testing.containers', 'testing.cleanup', 'testing.pytest_plugin', | |
| 'testing.presets', 'testing.presets.databases', 'testing.presets.infrastructure', | |
| 'testing.presets.messaging', 'testing.wait_strategies', | |
| } | |
| # ─────────────────────────────── data structures ──────────────────────────── | |
| class ClassInfo: | |
| """Rich info about a class definition.""" | |
| def __init__(self, name: str, node: ast.ClassDef, file_rel: str): | |
| self.name = name | |
| self.node = node | |
| self.file_rel = file_rel | |
| self.methods: Dict[str, ast.FunctionDef] = {} | |
| self.class_attrs: Set[str] = set() | |
| self.instance_attrs: Set[str] = set() # attrs set via self.X = ... in __init__ | |
| self.all_self_reads: Set[str] = set() # all self.X reads in any method | |
| self.all_self_writes: Set[str] = set() # all self.X writes in any method | |
| self.bases: List[str] = [] | |
| self.is_dataclass = False | |
| self.dataclass_fields: Set[str] = set() | |
| self.lineno = node.lineno | |
| def collect(self): | |
| """Walk the class node and populate fields.""" | |
| # Bases | |
| for b in self.node.bases: | |
| if isinstance(b, ast.Name): | |
| self.bases.append(b.id) | |
| elif isinstance(b, ast.Attribute): | |
| self.bases.append(ast.dump(b)) | |
| # Decorators | |
| for dec in self.node.decorator_list: | |
| if isinstance(dec, ast.Name) and dec.id == 'dataclass': | |
| self.is_dataclass = True | |
| elif isinstance(dec, ast.Call) and isinstance(dec.func, ast.Name) and dec.func.id == 'dataclass': | |
| self.is_dataclass = True | |
| for item in self.node.body: | |
| if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| self.methods[item.name] = item | |
| # Collect self.X writes and reads | |
| self_param = item.args.args[0].arg if item.args.args else None | |
| if self_param == 'self': | |
| for sub in ast.walk(item): | |
| if isinstance(sub, ast.Attribute) and isinstance(sub.value, ast.Name) and sub.value.id == 'self': | |
| attr_name = sub.attr | |
| # Determine if it's a write (target of assignment) or read | |
| self.all_self_reads.add(attr_name) | |
| # Specifically find writes: self.X = ... and self.X: T = ... | |
| for sub in ast.walk(item): | |
| if isinstance(sub, ast.Assign): | |
| for t in sub.targets: | |
| if isinstance(t, ast.Attribute) and isinstance(t.value, ast.Name) and t.value.id == 'self': | |
| self.all_self_writes.add(t.attr) | |
| if item.name == '__init__': | |
| self.instance_attrs.add(t.attr) | |
| elif isinstance(sub, ast.AnnAssign): | |
| if (isinstance(sub.target, ast.Attribute) and | |
| isinstance(sub.target.value, ast.Name) and | |
| sub.target.value.id == 'self'): | |
| self.all_self_writes.add(sub.target.attr) | |
| if item.name == '__init__': | |
| self.instance_attrs.add(sub.target.attr) | |
| elif isinstance(sub, ast.AugAssign): | |
| if isinstance(sub.target, ast.Attribute) and isinstance(sub.target.value, ast.Name) and sub.target.value.id == 'self': | |
| self.all_self_writes.add(sub.target.attr) | |
| elif isinstance(item, ast.Assign): | |
| for t in item.targets: | |
| if isinstance(t, ast.Name): | |
| self.class_attrs.add(t.id) | |
| elif isinstance(item, ast.AnnAssign) and isinstance(item.target, ast.Name): | |
| self.class_attrs.add(item.target.id) | |
| if self.is_dataclass: | |
| self.dataclass_fields.add(item.target.id) | |
| class ModuleInfo: | |
| """Collected info about a single .py file.""" | |
| def __init__(self, path: Path): | |
| self.path = path | |
| self.rel = str(path.relative_to(PKG_ROOT.parent)) | |
| self.tree: Optional[ast.Module] = None | |
| self.source: str = "" | |
| self.parse_error: Optional[str] = None | |
| # Top-level names | |
| self.defined_names: Set[str] = set() | |
| self.defined_name_lines: Dict[str, int] = {} | |
| # Classes with rich info | |
| self.classes: Dict[str, ClassInfo] = {} | |
| # Functions (top-level) | |
| self.functions: Dict[str, ast.FunctionDef] = {} | |
| # Imports | |
| self.imports_from_pkg: List[Tuple[str, str, int, int]] = [] # (module, name, lineno, level) | |
| self.imports_star: List[Tuple[str, int]] = [] | |
| self.imports_external: List[Tuple[str, int]] = [] | |
| self.imported_names: Dict[str, int] = {} # name -> lineno (all imported names) | |
| # __all__ | |
| self.all_exports: Optional[List[str]] = None | |
| self.all_extend_names: List[str] = [] | |
| # Issues | |
| self.duplicate_defs: List[Tuple[str, int, int]] = [] | |
| # ─────────────────────────────── collectors ───────────────────────────────── | |
| def _register_name(info: ModuleInfo, name: str, lineno: int, first_def_line: dict, | |
| in_try_except: bool = False): | |
| if name in first_def_line and name != '_' and not in_try_except: | |
| info.duplicate_defs.append((name, first_def_line[name], lineno)) | |
| else: | |
| first_def_line[name] = lineno | |
| info.defined_names.add(name) | |
| if name not in info.defined_name_lines: | |
| info.defined_name_lines[name] = lineno | |
| def _collect_import(node: ast.ImportFrom, info: ModuleInfo): | |
| """Register a single ImportFrom node into info.""" | |
| if node.level and node.level > 0: | |
| mod_name = node.module or '' | |
| level = node.level | |
| for alias in node.names: | |
| actual = alias.asname or alias.name | |
| if alias.name == '*': | |
| info.imports_star.append((mod_name, node.lineno)) | |
| else: | |
| info.imports_from_pkg.append((mod_name, alias.name, node.lineno, level)) | |
| info.defined_names.add(actual) | |
| info.imported_names[actual] = node.lineno | |
| elif node.module: | |
| info.imports_external.append((node.module, node.lineno)) | |
| for alias in node.names: | |
| actual = alias.asname or alias.name | |
| if alias.name == '*': | |
| info.imports_star.append((node.module, node.lineno)) | |
| info.defined_names.add(actual) | |
| info.imported_names[actual] = node.lineno | |
| def collect_module_info(path: Path) -> ModuleInfo: | |
| info = ModuleInfo(path) | |
| try: | |
| info.source = path.read_text(encoding='utf-8') | |
| info.tree = ast.parse(info.source, filename=str(path)) | |
| except SyntaxError as e: | |
| info.parse_error = f"Line {e.lineno}: {e.msg}" | |
| return info | |
| first_def_line: Dict[str, int] = {} | |
| def _walk_top_level(stmts, in_try_except=False): | |
| for node in stmts: | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| _register_name(info, node.name, node.lineno, first_def_line, in_try_except) | |
| info.functions[node.name] = node | |
| elif isinstance(node, ast.ClassDef): | |
| _register_name(info, node.name, node.lineno, first_def_line, in_try_except) | |
| ci = ClassInfo(node.name, node, info.rel) | |
| ci.collect() | |
| info.classes[node.name] = ci | |
| elif isinstance(node, ast.Assign): | |
| for target in node.targets: | |
| if isinstance(target, ast.Name): | |
| name = target.id | |
| if name == '__all__' and isinstance(node.value, ast.List): | |
| info.all_exports = [ | |
| elt.value for elt in node.value.elts | |
| if isinstance(elt, ast.Constant) and isinstance(elt.value, str) | |
| ] | |
| else: | |
| _register_name(info, name, node.lineno, first_def_line, in_try_except) | |
| elif isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name): | |
| _register_name(info, node.target.id, node.lineno, first_def_line, in_try_except) | |
| elif isinstance(node, ast.Import): | |
| for alias in node.names: | |
| actual = alias.asname or alias.name.split('.')[0] | |
| info.defined_names.add(actual) | |
| info.imported_names[actual] = node.lineno | |
| info.imports_external.append((alias.name, node.lineno)) | |
| elif isinstance(node, ast.ImportFrom): | |
| _collect_import(node, info) | |
| elif isinstance(node, ast.Expr) and isinstance(node.value, ast.Call): | |
| call = node.value | |
| if (isinstance(call.func, ast.Attribute) and | |
| call.func.attr == 'extend' and | |
| isinstance(call.func.value, ast.Name) and | |
| call.func.value.id == '__all__' and | |
| call.args and isinstance(call.args[0], ast.List)): | |
| for elt in call.args[0].elts: | |
| if isinstance(elt, ast.Constant) and isinstance(elt.value, str): | |
| info.all_extend_names.append(elt.value) | |
| elif isinstance(node, ast.Try): | |
| _walk_top_level(node.body, in_try_except=True) | |
| for handler in node.handlers: | |
| _walk_top_level(handler.body, in_try_except=True) | |
| _walk_top_level(node.orelse, in_try_except=True) | |
| if hasattr(node, 'finalbody'): | |
| _walk_top_level(node.finalbody, in_try_except=True) | |
| elif isinstance(node, ast.If): | |
| _walk_top_level(node.body, in_try_except) | |
| _walk_top_level(node.orelse, in_try_except) | |
| _walk_top_level(info.tree.body) | |
| return info | |
| def resolve_relative_module(from_file: Path, module_name: str, level: int = 1) -> Optional[Path]: | |
| if not module_name: | |
| return None | |
| base_dir = from_file.parent | |
| for _ in range(level - 1): | |
| base_dir = base_dir.parent | |
| parts = module_name.split('.') | |
| candidate = base_dir / '/'.join(parts) | |
| if candidate.with_suffix('.py').exists(): | |
| return candidate.with_suffix('.py') | |
| if (candidate / '__init__.py').exists(): | |
| return candidate / '__init__.py' | |
| candidate2 = PKG_ROOT / '/'.join(parts) | |
| if candidate2.with_suffix('.py').exists(): | |
| return candidate2.with_suffix('.py') | |
| if (candidate2 / '__init__.py').exists(): | |
| return candidate2 / '__init__.py' | |
| return None | |
| def find_all_py_files(root: Path) -> List[Path]: | |
| result = [] | |
| for dirpath, _, filenames in os.walk(root): | |
| for fn in sorted(filenames): | |
| if fn.endswith('.py'): | |
| result.append(Path(dirpath) / fn) | |
| return result | |
| # ─────────────────────────────── analysis phases ──────────────────────────── | |
| def main(): | |
| errors: List[str] = [] | |
| warnings: List[str] = [] | |
| info_msgs: List[str] = [] | |
| all_files = find_all_py_files(PKG_ROOT) | |
| print(f"Scanning {len(all_files)} Python files in dockercraft package...\n") | |
| # ── Phase 1: Parse all files ── | |
| modules: Dict[str, ModuleInfo] = {} | |
| file_to_info: Dict[Path, ModuleInfo] = {} | |
| for fpath in all_files: | |
| info = collect_module_info(fpath) | |
| file_to_info[fpath] = info | |
| rel = fpath.relative_to(PKG_ROOT) | |
| if rel.name == '__init__.py': | |
| mod_name = str(rel.parent).replace('/', '.') | |
| if mod_name == '.': | |
| mod_name = '' | |
| else: | |
| mod_name = str(rel.with_suffix('')).replace('/', '.') | |
| modules[mod_name] = info | |
| if info.parse_error: | |
| errors.append(f"SYNTAX ERROR in {info.rel}: {info.parse_error}") | |
| for name, first, dup in info.duplicate_defs: | |
| warnings.append(f"DUPLICATE DEF in {info.rel}: '{name}' at line {first} and line {dup}") | |
| print(f"Phase 1: Parse all files") | |
| print(f" {len(modules)} modules | {sum(1 for m in modules.values() if m.parse_error)} syntax errors | " | |
| f"{sum(len(m.duplicate_defs) for m in modules.values())} duplicate defs") | |
| # Build global class registry: class_name -> list of (mod_name, ClassInfo) | |
| global_classes: Dict[str, List[Tuple[str, ClassInfo]]] = defaultdict(list) | |
| for mod_name, info in modules.items(): | |
| for cls_name, ci in info.classes.items(): | |
| global_classes[cls_name].append((mod_name, ci)) | |
| # ── Phase 2: Intra-package imports ── | |
| print("\nPhase 2: Intra-package imports") | |
| resolved = unresolved = 0 | |
| for mod_name, info in modules.items(): | |
| for imp_mod, imp_name, lineno, level in info.imports_from_pkg: | |
| dots = '.' * level | |
| target_path = resolve_relative_module(info.path, imp_mod, level) | |
| if target_path is None: | |
| errors.append(f"UNRESOLVED MODULE {info.rel}:{lineno}: from {dots}{imp_mod} import {imp_name}") | |
| unresolved += 1; continue | |
| ti = file_to_info.get(target_path) | |
| if ti is None: | |
| errors.append(f"UNRESOLVED MODULE {info.rel}:{lineno}: {target_path} not scanned") | |
| unresolved += 1; continue | |
| if ti.imports_star: | |
| resolved += 1; continue | |
| if imp_name not in ti.defined_names: | |
| errors.append(f"MISSING NAME {info.rel}:{lineno}: from {dots}{imp_mod} import {imp_name} — " | |
| f"'{imp_name}' not in {ti.rel}") | |
| unresolved += 1 | |
| else: | |
| resolved += 1 | |
| for imp_mod, lineno in info.imports_star: | |
| tp = resolve_relative_module(info.path, imp_mod) | |
| if tp is None and imp_mod not in STDLIB_MODULES and imp_mod.split('.')[0] not in KNOWN_THIRD_PARTY: | |
| if any(imp_mod.startswith(x) for x in ('dockercraft',)): | |
| errors.append(f"UNRESOLVED STAR {info.rel}:{lineno}: from .{imp_mod} import *") | |
| print(f" {resolved} resolved | {unresolved} unresolved") | |
| # ── Phase 3: __all__ exports ── | |
| print("\nPhase 3: __all__ exports") | |
| init_info = modules.get('') | |
| if init_info: | |
| all_names = list(init_info.all_exports or []) + init_info.all_extend_names | |
| missing = [] | |
| for name in all_names: | |
| if name in init_info.defined_names: | |
| continue | |
| found = False | |
| for star_mod, _ in init_info.imports_star: | |
| sp = resolve_relative_module(init_info.path, star_mod) | |
| if sp and sp in file_to_info: | |
| si = file_to_info[sp] | |
| if name in si.defined_names: | |
| found = True; break | |
| for ns, _ in si.imports_star: | |
| np = resolve_relative_module(si.path, ns) | |
| if np and np in file_to_info and name in file_to_info[np].defined_names: | |
| found = True; break | |
| if found: break | |
| if not found: | |
| missing.append(name) | |
| errors.append(f"__all__ GHOST: '{name}' in __all__ but not importable") | |
| print(f" {len(all_names)} exports | {len(missing)} missing") | |
| # Check for duplicates in __all__ | |
| counter = Counter(all_names) | |
| for name, count in counter.items(): | |
| if count > 1: | |
| warnings.append(f"DUPLICATE __all__: '{name}' appears {count} times") | |
| else: | |
| print(" __init__.py not found!") | |
| # ── Phase 4: External imports ── | |
| print("\nPhase 4: External imports") | |
| ext_ok = ext_warn = 0 | |
| for mod_name, info in modules.items(): | |
| for imp_mod, lineno in info.imports_external: | |
| top = imp_mod.split('.')[0] | |
| if top in STDLIB_MODULES or top in KNOWN_THIRD_PARTY or top == 'dockercraft': | |
| ext_ok += 1 | |
| else: | |
| try: | |
| __import__(top) | |
| ext_ok += 1 | |
| except ImportError: | |
| warnings.append(f"UNKNOWN IMPORT {info.rel}:{lineno}: '{imp_mod}'") | |
| ext_warn += 1 | |
| print(f" {ext_ok} OK | {ext_warn} unknown") | |
| # ── Phase 5: Bare except, mutable defaults, print() ── | |
| print("\nPhase 5: Code smells") | |
| smells = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.ExceptHandler) and node.type is None: | |
| warnings.append(f"BARE EXCEPT {info.rel}:{node.lineno}") | |
| smells += 1 | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| for d in node.args.defaults + node.args.kw_defaults: | |
| if d and isinstance(d, (ast.List, ast.Dict, ast.Set)): | |
| warnings.append(f"MUTABLE DEFAULT {info.rel}:{node.lineno}: {node.name}()") | |
| smells += 1 | |
| if (isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and | |
| node.func.id == 'print' and 'cli' not in info.rel and | |
| 'test' not in info.rel): | |
| info_msgs.append(f"PRINT() {info.rel}:{node.lineno}") | |
| print(f" {smells} smells") | |
| # ── Phase 6: Circular imports ── | |
| print("\nPhase 6: Circular imports") | |
| dep_graph: Dict[str, Set[str]] = defaultdict(set) | |
| for mod_name, info in modules.items(): | |
| for imp_mod, _, _, _ in info.imports_from_pkg: | |
| dep_graph[mod_name].add(imp_mod) | |
| for imp_mod, _ in info.imports_star: | |
| if resolve_relative_module(info.path, imp_mod): | |
| dep_graph[mod_name].add(imp_mod) | |
| WHITE, GRAY, BLACK = 0, 1, 2 | |
| color = {m: WHITE for m in modules} | |
| cycles = [] | |
| def dfs(u, path): | |
| color[u] = GRAY; path.append(u) | |
| for v in dep_graph.get(u, []): | |
| if v not in color: continue | |
| if color[v] == GRAY: | |
| cycles.append(' → '.join(path[path.index(v):] + [v])) | |
| elif color[v] == WHITE: | |
| dfs(v, path) | |
| path.pop(); color[u] = BLACK | |
| for m in modules: | |
| if color[m] == WHITE: dfs(m, []) | |
| for c in cycles: | |
| warnings.append(f"CIRCULAR IMPORT: {c}") | |
| print(f" {len(cycles)} cycles") | |
| # ── Phase 7: Class hierarchy ── | |
| print("\nPhase 7: Class hierarchy") | |
| hier_issues = 0 | |
| BUILTIN_BASES = {'object', 'Exception', 'ValueError', 'TypeError', 'RuntimeError', | |
| 'OSError', 'IOError', 'ABC', 'IntEnum', 'Flag', 'Enum', | |
| 'UserDict', 'UserList', 'UserString', 'NamedTuple', | |
| 'BaseException', 'KeyError', 'AttributeError', 'LookupError', | |
| 'StopIteration', 'NotImplementedError', 'ArithmeticError', | |
| 'PermissionError', 'FileNotFoundError', 'ConnectionError', | |
| 'TimeoutError', 'ProcessLookupError'} | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| info = modules[mod_name] | |
| for base_name in ci.bases: | |
| if base_name.startswith('ast.'): continue # ast.dump output | |
| if base_name in BUILTIN_BASES: continue | |
| if base_name in info.defined_names: continue | |
| if base_name in global_classes: continue | |
| if base_name in PYTHON_BUILTINS: continue | |
| found = False | |
| for star_mod, _ in info.imports_star: | |
| sp = resolve_relative_module(info.path, star_mod) | |
| if sp and sp in file_to_info and base_name in file_to_info[sp].defined_names: | |
| found = True; break | |
| if not found: | |
| errors.append(f"UNDEFINED BASE {info.rel}:{ci.lineno}: " | |
| f"class {cls_name}({base_name}) — not found") | |
| hier_issues += 1 | |
| print(f" {sum(len(e) for e in global_classes.values())} classes | {hier_issues} issues") | |
| # ── Phase 8: Cross-module class name collisions ── | |
| print("\nPhase 8: Cross-module name collisions") | |
| collisions = 0 | |
| for cls_name, entries in global_classes.items(): | |
| if len(entries) > 1: | |
| locations = [f"{m}:{ci.lineno}" for m, ci in entries] | |
| # Only flag if they're not all in __init__.py (re-exports are fine) | |
| non_init = [loc for loc in locations if '__init__' not in loc] | |
| if len(non_init) > 1: | |
| warnings.append(f"CLASS COLLISION: '{cls_name}' defined in {', '.join(non_init)}") | |
| collisions += 1 | |
| # Also check top-level function collisions across modules | |
| func_locations: Dict[str, List[str]] = defaultdict(list) | |
| for mod_name, info in modules.items(): | |
| if mod_name == '' or 'testing' in mod_name: continue # __init__ re-exports are fine | |
| for fn_name in info.functions: | |
| if fn_name.startswith('_'): continue # private | |
| func_locations[fn_name].append(mod_name) | |
| for fn_name, locs in func_locations.items(): | |
| if len(locs) > 1: | |
| info_msgs.append(f"FUNC COLLISION: '{fn_name}' in {', '.join(locs)}") | |
| print(f" {collisions} class collisions") | |
| # ── Phase 9: self.attr reads without writes (agent forgot to init) ── | |
| print("\nPhase 9: self.attr without __init__") | |
| attr_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| if not ci.methods: continue | |
| # Attrs read but never written anywhere in the class | |
| all_known = ci.instance_attrs | ci.class_attrs | ci.dataclass_fields | set(ci.methods.keys()) | |
| # Add parent class attrs (basic: one-level lookup) | |
| for base_name in ci.bases: | |
| if base_name in global_classes: | |
| for _, parent_ci in global_classes[base_name]: | |
| all_known |= parent_ci.instance_attrs | parent_ci.class_attrs | parent_ci.dataclass_fields | set(parent_ci.methods.keys()) | |
| reads_without_writes = ci.all_self_reads - ci.all_self_writes - all_known | |
| # Filter out dunder methods/attrs and known dynamic patterns | |
| reads_without_writes = {a for a in reads_without_writes | |
| if not a.startswith('_') | |
| and a not in ('logger', 'log', 'name', 'id')} | |
| for attr in sorted(reads_without_writes): | |
| warnings.append(f"UNSET ATTR {modules[mod_name].rel}: " | |
| f"{cls_name}.{attr} read but never set") | |
| attr_issues += 1 | |
| print(f" {attr_issues} issues") | |
| # ── Phase 10: Methods missing self parameter ── | |
| print("\nPhase 10: Methods missing self/cls") | |
| missing_self = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| for meth_name, meth_node in ci.methods.items(): | |
| args = meth_node.args | |
| is_static = any( | |
| (isinstance(d, ast.Name) and d.id == 'staticmethod') or | |
| (isinstance(d, ast.Attribute) and d.attr == 'staticmethod') | |
| for d in meth_node.decorator_list | |
| ) | |
| is_classmethod = any( | |
| (isinstance(d, ast.Name) and d.id == 'classmethod') or | |
| (isinstance(d, ast.Attribute) and d.attr == 'classmethod') | |
| for d in meth_node.decorator_list | |
| ) | |
| is_abstract_only = ( | |
| len(meth_node.body) == 1 and | |
| isinstance(meth_node.body[0], ast.Expr) and | |
| isinstance(meth_node.body[0].value, ast.Constant) | |
| ) | |
| if is_static: | |
| continue | |
| if not args.args: | |
| errors.append(f"NO SELF {modules[mod_name].rel}:{meth_node.lineno}: " | |
| f"{cls_name}.{meth_name}() has no self/cls parameter") | |
| missing_self += 1 | |
| elif is_classmethod and args.args[0].arg != 'cls': | |
| warnings.append(f"BAD CLS {modules[mod_name].rel}:{meth_node.lineno}: " | |
| f"{cls_name}.{meth_name}() first param is '{args.args[0].arg}' not 'cls'") | |
| elif not is_classmethod and args.args[0].arg != 'self' and not meth_name.startswith('__'): | |
| # Heuristic: only flag for non-dunder if it looks like a real method | |
| info_msgs.append(f"ODD SELF {modules[mod_name].rel}:{meth_node.lineno}: " | |
| f"{cls_name}.{meth_name}() first param is '{args.args[0].arg}'") | |
| print(f" {missing_self} errors") | |
| # ── Phase 11: Unreachable code after return/raise ── | |
| print("\nPhase 11: Unreachable code") | |
| unreachable = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| body = node.body | |
| for i, stmt in enumerate(body[:-1]): | |
| if isinstance(stmt, (ast.Return, ast.Raise)): | |
| next_stmt = body[i + 1] | |
| # except/finally structure might follow, skip those | |
| if not isinstance(next_stmt, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): | |
| warnings.append(f"UNREACHABLE {info.rel}:{next_stmt.lineno}: " | |
| f"code after {'return' if isinstance(stmt, ast.Return) else 'raise'} " | |
| f"in {node.name}()") | |
| unreachable += 1 | |
| print(f" {unreachable} issues") | |
| # ── Phase 12: Shadowed imports (import X then def X) ── | |
| print("\nPhase 12: Shadowed imports") | |
| shadows = 0 | |
| for mod_name, info in modules.items(): | |
| imported = dict(info.imported_names) # name -> import lineno | |
| for name, line in info.defined_name_lines.items(): | |
| if name in imported and imported[name] < line and name not in ('__all__',): | |
| # Genuine shadow: imported then redefined | |
| if name not in info.classes and name not in info.functions: | |
| continue # Skip non-class/func redefs (often intentional like constants) | |
| warnings.append(f"SHADOW {info.rel}:{line}: '{name}' imported at line {imported[name]} " | |
| f"then redefined at line {line}") | |
| shadows += 1 | |
| print(f" {shadows} shadows") | |
| # ── Phase 13: Dataclass field consistency ── | |
| print("\nPhase 13: Dataclass fields") | |
| dc_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| if not ci.is_dataclass: continue | |
| seen = set() | |
| for stmt in ci.node.body: | |
| if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name): | |
| fname = stmt.target.id | |
| if fname in seen: | |
| warnings.append(f"DUPE DC FIELD {modules[mod_name].rel}:{stmt.lineno}: " | |
| f"{cls_name}.{fname}") | |
| dc_issues += 1 | |
| seen.add(fname) | |
| # Check that non-default fields come before default fields | |
| has_default = False | |
| for stmt in ci.node.body: | |
| if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name): | |
| if stmt.value is not None: | |
| has_default = True | |
| elif has_default: | |
| errors.append(f"DC FIELD ORDER {modules[mod_name].rel}:{stmt.lineno}: " | |
| f"{cls_name}.{stmt.target.id} has no default but follows a field with default") | |
| dc_issues += 1 | |
| print(f" {dc_issues} issues") | |
| # ── Phase 14: Exception class hierarchy ── | |
| print("\nPhase 14: Exception hierarchy consistency") | |
| exc_issues = 0 | |
| EXCEPTION_BASES = {'Exception', 'BaseException', 'ValueError', 'TypeError', 'RuntimeError', | |
| 'OSError', 'IOError', 'KeyError', 'AttributeError', 'LookupError', | |
| 'FileNotFoundError', 'PermissionError', 'ConnectionError', | |
| 'TimeoutError', 'NotImplementedError', 'StopIteration'} | |
| # Collect all our exception classes | |
| our_exceptions: Dict[str, ClassInfo] = {} | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| for base in ci.bases: | |
| if base in EXCEPTION_BASES or base.endswith('Error') or base.endswith('Exception'): | |
| our_exceptions[cls_name] = ci | |
| break | |
| # Verify exception classes used in raise/except exist | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Raise) and node.exc: | |
| exc_call = node.exc | |
| if isinstance(exc_call, ast.Call) and isinstance(exc_call.func, ast.Name): | |
| exc_name = exc_call.func.id | |
| if exc_name.endswith('Error') or exc_name.endswith('Exception'): | |
| if (exc_name not in info.defined_names and | |
| exc_name not in EXCEPTION_BASES and | |
| exc_name not in our_exceptions and | |
| exc_name not in PYTHON_BUILTINS): | |
| errors.append(f"UNKNOWN EXCEPTION {info.rel}:{node.lineno}: " | |
| f"raise {exc_name}() — not imported/defined") | |
| exc_issues += 1 | |
| if isinstance(node, ast.ExceptHandler) and node.type: | |
| if isinstance(node.type, ast.Name): | |
| exc_name = node.type.id | |
| if exc_name.endswith('Error') or exc_name.endswith('Exception'): | |
| if (exc_name not in info.defined_names and | |
| exc_name not in EXCEPTION_BASES and | |
| exc_name not in our_exceptions and | |
| exc_name not in PYTHON_BUILTINS): | |
| errors.append(f"UNKNOWN EXCEPT {info.rel}:{node.lineno}: " | |
| f"except {exc_name} — not imported/defined") | |
| exc_issues += 1 | |
| print(f" {len(our_exceptions)} custom exceptions | {exc_issues} issues") | |
| # ── Phase 15: Stale string references to class/function names ── | |
| print("\nPhase 15: Stale string references") | |
| stale = 0 | |
| # Collect all public names from the package | |
| all_public: Set[str] = set() | |
| for mod_name, info in modules.items(): | |
| for name in info.defined_names: | |
| if not name.startswith('_'): | |
| all_public.add(name) | |
| # Check for string literals that look like they reference internal names but are misspelled | |
| # Focus on getattr/hasattr calls with string args | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if (isinstance(node, ast.Call) and isinstance(node.func, ast.Name) and | |
| node.func.id in ('getattr', 'hasattr', 'setattr') and | |
| len(node.args) >= 2 and isinstance(node.args[1], ast.Constant) and | |
| isinstance(node.args[1].value, str)): | |
| attr_str = node.args[1].value | |
| # Check if it looks like a method name reference | |
| if attr_str.startswith('_parse_') or attr_str.startswith('_fix_'): | |
| # These are dynamic dispatch patterns — check they exist in some class | |
| found = False | |
| for cls_entries in global_classes.values(): | |
| for _, ci in cls_entries: | |
| if attr_str in ci.methods: | |
| found = True; break | |
| if found: break | |
| if not found and attr_str not in all_public: | |
| info_msgs.append(f"STALE REF? {info.rel}:{node.lineno}: " | |
| f"{node.func.id}(..., '{attr_str}') — not found in any class") | |
| stale += 1 | |
| print(f" {stale} suspicious refs") | |
| # ── Phase 16: Function signature consistency across overrides ── | |
| print("\nPhase 16: Method override signatures") | |
| override_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| for base_name in ci.bases: | |
| if base_name not in global_classes: continue | |
| for _, parent_ci in global_classes[base_name]: | |
| for meth_name, meth_node in ci.methods.items(): | |
| if meth_name.startswith('_') and not meth_name.startswith('__'): continue | |
| if meth_name not in parent_ci.methods: continue | |
| parent_meth = parent_ci.methods[meth_name] | |
| # Compare arg counts (ignoring self) | |
| child_args = len(meth_node.args.args) | |
| parent_args = len(parent_meth.args.args) | |
| child_has_var = meth_node.args.vararg is not None | |
| parent_has_var = parent_meth.args.vararg is not None | |
| child_has_kw = meth_node.args.kwarg is not None | |
| parent_has_kw = parent_meth.args.kwarg is not None | |
| if child_has_var or parent_has_var or child_has_kw or parent_has_kw: | |
| continue # *args/**kwargs make comparison unreliable | |
| if child_args != parent_args and meth_name != '__init__': | |
| warnings.append( | |
| f"OVERRIDE MISMATCH {modules[mod_name].rel}:{meth_node.lineno}: " | |
| f"{cls_name}.{meth_name}() has {child_args} args, " | |
| f"parent {base_name}.{meth_name}() has {parent_args}") | |
| override_issues += 1 | |
| print(f" {override_issues} mismatches") | |
| # ── Phase 17: Dead code detection ── | |
| print("\nPhase 17: Dead code (defined but never referenced)") | |
| dead = 0 | |
| # Build a set of all referenced names across the entire codebase | |
| all_referenced: Set[str] = set() | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Name): | |
| all_referenced.add(node.id) | |
| elif isinstance(node, ast.Attribute): | |
| all_referenced.add(node.attr) | |
| # Get names in __all__ | |
| all_exported_set = set() | |
| if init_info: | |
| all_exported_set = set(init_info.all_exports or []) | set(init_info.all_extend_names) | |
| # Check each module's public functions and classes | |
| for mod_name, info in modules.items(): | |
| if mod_name == '': continue # __init__ is just re-exports | |
| if 'test' in mod_name: continue | |
| for name in list(info.functions.keys()) + list(info.classes.keys()): | |
| if name.startswith('_'): continue | |
| if name in all_referenced: continue | |
| if name in all_exported_set: continue | |
| # Check if it's imported anywhere | |
| imported_anywhere = False | |
| for other_mod, other_info in modules.items(): | |
| if other_mod == mod_name: continue | |
| for _, imp_name, _, _ in other_info.imports_from_pkg: | |
| if imp_name == name: | |
| imported_anywhere = True; break | |
| if imported_anywhere: break | |
| for star_mod, _ in other_info.imports_star: | |
| sp = resolve_relative_module(other_info.path, star_mod) | |
| if sp and sp == info.path: | |
| imported_anywhere = True; break | |
| if imported_anywhere: break | |
| if not imported_anywhere: | |
| info_msgs.append(f"DEAD CODE? {info.rel}: '{name}' — never imported or referenced elsewhere") | |
| dead += 1 | |
| print(f" {dead} potentially dead") | |
| # ── Phase 18: Missing return value (function returns value on some paths but falls off end) ── | |
| print("\nPhase 18: Functions with missing return values") | |
| ret_issues = 0 | |
| def _body_can_fall_through(body: list) -> bool: | |
| """Check if a statement list can fall through (no guaranteed return/raise).""" | |
| if not body: | |
| return True | |
| last = body[-1] | |
| if isinstance(last, (ast.Return, ast.Raise)): | |
| return False | |
| # Yield expressions (generators/fixtures) are intentional | |
| if isinstance(last, ast.Expr) and isinstance(last.value, (ast.Yield, ast.YieldFrom)): | |
| return False | |
| if isinstance(last, ast.If): | |
| # Both branches must be terminal | |
| if not last.orelse: | |
| return True | |
| return _body_can_fall_through(last.body) or _body_can_fall_through(last.orelse) | |
| if isinstance(last, ast.Try): | |
| # Conservative: if body is terminal and all handlers are terminal | |
| if _body_can_fall_through(last.body): | |
| return True | |
| for handler in last.handlers: | |
| if _body_can_fall_through(handler.body): | |
| return True | |
| return False | |
| if isinstance(last, ast.For) or isinstance(last, ast.While): | |
| # Loops might not execute; check for else clause | |
| if last.orelse and not _body_can_fall_through(last.orelse): | |
| return False | |
| return True | |
| if isinstance(last, ast.With): | |
| return _body_can_fall_through(last.body) | |
| return True | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| if node.name.startswith('_'): continue | |
| # Skip generator functions (yield-based) — falling through is normal | |
| is_generator = any(isinstance(sub, (ast.Yield, ast.YieldFrom)) | |
| for sub in ast.walk(node)) | |
| if is_generator: continue | |
| # Collect return-value statements (not inside nested functions) | |
| has_value_return = False | |
| for sub in ast.walk(node): | |
| if sub is not node and isinstance(sub, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| continue # skip nested | |
| if isinstance(sub, ast.Return) and sub.value is not None: | |
| if not (isinstance(sub.value, ast.Constant) and sub.value.value is None): | |
| has_value_return = True | |
| break | |
| if has_value_return and _body_can_fall_through(node.body): | |
| warnings.append( | |
| f"MISSING RETURN {info.rel}:{node.lineno}: {node.name}() " | |
| f"returns a value on some paths but can fall off the end") | |
| ret_issues += 1 | |
| print(f" {ret_issues} issues") | |
| # ── Phase 19: String formatting safety ── | |
| print("\nPhase 19: f-string / format() variable refs") | |
| fstr_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.JoinedStr): | |
| for val in node.values: | |
| if isinstance(val, ast.FormattedValue) and isinstance(val.value, ast.Name): | |
| var_name = val.value.id | |
| # Can't do full scope analysis, but flag obvious issues | |
| if var_name.endswith('_typo') or var_name == 'udnefined': | |
| errors.append(f"FSTR TYPO {info.rel}:{node.lineno}: f-string references '{var_name}'") | |
| fstr_issues += 1 | |
| print(f" {fstr_issues} issues") | |
| # ── Phase 20: Subpackage __init__.py exports ── | |
| print("\nPhase 20: Subpackage exports") | |
| sub_issues = 0 | |
| for sub_name in ('testing', 'testing.presets'): | |
| sub_info = modules.get(sub_name) | |
| if not sub_info: continue | |
| sub_all = list(sub_info.all_exports or []) + sub_info.all_extend_names | |
| for name in sub_all: | |
| if name in sub_info.defined_names: continue | |
| found = False | |
| for star_mod, _ in sub_info.imports_star: | |
| sp = resolve_relative_module(sub_info.path, star_mod) | |
| if sp and sp in file_to_info and name in file_to_info[sp].defined_names: | |
| found = True; break | |
| if not found: | |
| errors.append(f"SUBPKG EXPORT {sub_name}/__init__.py: '{name}' not importable") | |
| sub_issues += 1 | |
| print(f" {sub_name}: {len(sub_all)} exports, {sub_issues} missing") | |
| # ── Phase 21: Cross-file exception usage vs definitions ── | |
| print("\nPhase 21: Exception raise/catch cross-ref") | |
| exc_xref = 0 | |
| # Build complete set of exception names available per module | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| exc_available = set(info.defined_names) | EXCEPTION_BASES | PYTHON_BUILTINS | |
| # Add names from star imports | |
| for star_mod, _ in info.imports_star: | |
| sp = resolve_relative_module(info.path, star_mod) | |
| if sp and sp in file_to_info: | |
| exc_available |= file_to_info[sp].defined_names | |
| for ns, _ in file_to_info[sp].imports_star: | |
| np = resolve_relative_module(file_to_info[sp].path, ns) | |
| if np and np in file_to_info: | |
| exc_available |= file_to_info[np].defined_names | |
| for node in ast.walk(info.tree): | |
| # Check raise statements | |
| if isinstance(node, ast.Raise) and node.exc: | |
| if isinstance(node.exc, ast.Call) and isinstance(node.exc.func, ast.Name): | |
| name = node.exc.func.id | |
| if name not in exc_available and (name.endswith('Error') or name.endswith('Exception')): | |
| errors.append(f"RAISE UNDEF {info.rel}:{node.lineno}: raise {name}() — not available") | |
| exc_xref += 1 | |
| elif isinstance(node.exc, ast.Name): | |
| name = node.exc.id | |
| if name not in exc_available and (name.endswith('Error') or name.endswith('Exception')): | |
| errors.append(f"RAISE UNDEF {info.rel}:{node.lineno}: raise {name} — not available") | |
| exc_xref += 1 | |
| # Check except clauses | |
| if isinstance(node, ast.ExceptHandler) and node.type: | |
| if isinstance(node.type, ast.Tuple): | |
| for elt in node.type.elts: | |
| if isinstance(elt, ast.Name): | |
| name = elt.id | |
| if name not in exc_available and (name.endswith('Error') or name.endswith('Exception')): | |
| errors.append(f"CATCH UNDEF {info.rel}:{node.lineno}: except {name} — not available") | |
| exc_xref += 1 | |
| elif isinstance(node.type, ast.Name): | |
| name = node.type.id | |
| if name not in exc_available and (name.endswith('Error') or name.endswith('Exception')): | |
| errors.append(f"CATCH UNDEF {info.rel}:{node.lineno}: except {name} — not available") | |
| exc_xref += 1 | |
| print(f" {exc_xref} issues") | |
| # ── Phase 22: Abstract method implementation check ── | |
| print("\nPhase 22: Abstract method implementation") | |
| abstract_issues = 0 | |
| # Find abstract methods | |
| abstract_methods: Dict[str, Set[str]] = {} # class_name -> set of abstract method names | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| for meth_name, meth_node in ci.methods.items(): | |
| is_abstract = any( | |
| (isinstance(d, ast.Name) and d.id == 'abstractmethod') or | |
| (isinstance(d, ast.Attribute) and d.attr == 'abstractmethod') | |
| for d in meth_node.decorator_list | |
| ) | |
| if is_abstract: | |
| if cls_name not in abstract_methods: | |
| abstract_methods[cls_name] = set() | |
| abstract_methods[cls_name].add(meth_name) | |
| # Check concrete subclasses implement them | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| # Check if any base has abstract methods | |
| for base_name in ci.bases: | |
| if base_name in abstract_methods: | |
| missing = abstract_methods[base_name] - set(ci.methods.keys()) | |
| # Check if the subclass itself is abstract | |
| is_abstract_cls = any( | |
| any((isinstance(d, ast.Name) and d.id == 'abstractmethod') | |
| for d in meth.decorator_list) | |
| for meth in ci.methods.values() | |
| ) | |
| has_abc_base = 'ABC' in ci.bases or 'ABCMeta' in ci.bases | |
| if missing and not is_abstract_cls and not has_abc_base: | |
| for m in missing: | |
| warnings.append( | |
| f"MISSING ABSTRACT {modules[mod_name].rel}:{ci.lineno}: " | |
| f"{cls_name} doesn't implement {base_name}.{m}()") | |
| abstract_issues += 1 | |
| print(f" {len(abstract_methods)} abstract classes | {abstract_issues} missing implementations") | |
| # ── Phase 23: Function call argument count (cross-module) ── | |
| print("\nPhase 23: Cross-module function arg counts") | |
| xmod_call_issues = 0 | |
| # Build global function sig registry | |
| global_sigs: Dict[str, Tuple[int, int, bool, bool, str]] = {} # name -> (min, max, *args, **kw, file) | |
| for mod_name, info in modules.items(): | |
| for fn_name, fn_node in info.functions.items(): | |
| if fn_name.startswith('_'): continue | |
| args = fn_node.args | |
| n = len(args.args) | |
| d = len(args.defaults) | |
| global_sigs[fn_name] = (n - d, n, args.vararg is not None, args.kwarg is not None, info.rel) | |
| # Also add class constructors | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| if '__init__' in ci.methods: | |
| init = ci.methods['__init__'] | |
| args = init.args | |
| n = len(args.args) - 1 # minus self | |
| d = len(args.defaults) | |
| global_sigs[cls_name] = (max(0, n - d), n, args.vararg is not None, | |
| args.kwarg is not None, modules[mod_name].rel) | |
| # Check calls across modules | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): | |
| fn = node.func.id | |
| if fn not in global_sigs: continue | |
| min_a, max_a, has_var, has_kw, def_file = global_sigs[fn] | |
| if has_var or has_kw: continue | |
| n_pos = len(node.args) | |
| has_starargs = any(isinstance(a, ast.Starred) for a in node.args) | |
| if has_starargs: continue | |
| if n_pos > max_a: | |
| warnings.append(f"TOO MANY ARGS {info.rel}:{node.lineno}: " | |
| f"{fn}() called with {n_pos} positional, max is {max_a} (defined in {def_file})") | |
| xmod_call_issues += 1 | |
| print(f" {xmod_call_issues} issues") | |
| # ── Phase 24: Runtime import verification ── | |
| print("\nPhase 24: Runtime import verification") | |
| import importlib | |
| pkg_parent = str(PKG_ROOT.parent) | |
| if pkg_parent not in sys.path: | |
| sys.path.insert(0, pkg_parent) | |
| rt_ok = rt_fail = rt_skip = 0 | |
| for mod_name, info in sorted(modules.items()): | |
| full_mod = f"dockercraft.{mod_name}" if mod_name else "dockercraft" | |
| if mod_name in DOCKER_DEPENDENT: | |
| rt_skip += 1; continue | |
| try: | |
| importlib.import_module(full_mod) | |
| rt_ok += 1 | |
| except Exception as e: | |
| errors.append(f"RUNTIME IMPORT FAIL: import {full_mod} — {type(e).__name__}: {e}") | |
| rt_fail += 1 | |
| print(f" {rt_ok} OK | {rt_fail} failed | {rt_skip} skipped (Docker SDK)") | |
| # ── Phase 25: Runtime __all__ verification ── | |
| print("\nPhase 25: Runtime __all__ access verification") | |
| rt_all_fail = 0 | |
| try: | |
| import dockercraft as df_mod | |
| if hasattr(df_mod, '__all__'): | |
| for name in df_mod.__all__: | |
| if not hasattr(df_mod, name): | |
| errors.append(f"RUNTIME __all__ MISSING: dockercraft.{name} — in __all__ but not accessible") | |
| rt_all_fail += 1 | |
| except Exception as e: | |
| errors.append(f"RUNTIME __all__ CHECK FAIL: {type(e).__name__}: {e}") | |
| rt_all_fail += 1 | |
| print(f" {rt_all_fail} failures") | |
| # ── Phase 26: Verify no file has mixed line endings or encoding issues ── | |
| print("\nPhase 26: File hygiene") | |
| hygiene = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.source: continue | |
| if '\r\n' in info.source and '\n' in info.source.replace('\r\n', ''): | |
| warnings.append(f"MIXED NEWLINES {info.rel}") | |
| hygiene += 1 | |
| # Check for null bytes | |
| if '\x00' in info.source: | |
| errors.append(f"NULL BYTES {info.rel}") | |
| hygiene += 1 | |
| # Check for trailing whitespace on many lines (agent artifact) | |
| lines = info.source.split('\n') | |
| trailing_count = sum(1 for l in lines if l != l.rstrip() and l.strip()) | |
| if trailing_count > len(lines) * 0.3 and trailing_count > 20: | |
| info_msgs.append(f"TRAILING WS {info.rel}: {trailing_count}/{len(lines)} lines") | |
| print(f" {hygiene} issues") | |
| # ── Phase 27: self.method() calls to non-existent methods ── | |
| print("\nPhase 27: self.method() calls to non-existent methods") | |
| self_call_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| # Build set of all available methods (own + all ancestors) | |
| available_methods: Set[str] = set(ci.methods.keys()) | ci.class_attrs | |
| # Walk ancestor chain (multi-level) | |
| visit_queue = list(ci.bases) | |
| visited_bases: Set[str] = set() | |
| while visit_queue: | |
| base_name = visit_queue.pop(0) | |
| if base_name in visited_bases: continue | |
| visited_bases.add(base_name) | |
| if base_name in global_classes: | |
| for _, parent_ci in global_classes[base_name]: | |
| available_methods |= set(parent_ci.methods.keys()) | parent_ci.class_attrs | |
| visit_queue.extend(parent_ci.bases) | |
| # Now find all self.method() calls in every method | |
| for meth_name, meth_node in ci.methods.items(): | |
| self_param = meth_node.args.args[0].arg if meth_node.args.args else None | |
| if self_param != 'self': continue | |
| for sub in ast.walk(meth_node): | |
| if (isinstance(sub, ast.Call) and | |
| isinstance(sub.func, ast.Attribute) and | |
| isinstance(sub.func.value, ast.Name) and | |
| sub.func.value.id == 'self'): | |
| called = sub.func.attr | |
| if called.startswith('__') and called.endswith('__'): | |
| continue # dunders always available | |
| if called in available_methods: | |
| continue | |
| # Check if it's a property or dynamically set attr | |
| if called in ci.all_self_writes or called in ci.instance_attrs: | |
| continue # might be a callable attr | |
| if called in ci.dataclass_fields: | |
| continue | |
| errors.append( | |
| f"SELF CALL UNDEF {modules[mod_name].rel}:{sub.func.lineno}: " | |
| f"{cls_name}.{meth_name}() calls self.{called}() — " | |
| f"not found in {cls_name} or ancestors") | |
| self_call_issues += 1 | |
| print(f" {self_call_issues} issues") | |
| # ── Phase 28: Unused imports ── | |
| print("\nPhase 28: Unused imports") | |
| unused_import_count = 0 | |
| for mod_name, info in modules.items(): | |
| if mod_name == '': continue # __init__ re-exports are fine | |
| if not info.tree: continue | |
| # Collect all Name and Attribute references in this module (excluding imports themselves) | |
| used_names: Set[str] = set() | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Name): | |
| used_names.add(node.id) | |
| elif isinstance(node, ast.Attribute): | |
| used_names.add(node.attr) | |
| # Also count names used in decorators, type annotations, string annotations | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Constant) and isinstance(node.value, str): | |
| # Forward-reference annotations like 'ClassName' | |
| if node.value in info.imported_names: | |
| used_names.add(node.value) | |
| # Check each imported name | |
| for imp_name, imp_line in info.imported_names.items(): | |
| if imp_name.startswith('_'): continue | |
| if imp_name in used_names: continue | |
| # Check if it's re-exported via __all__ | |
| if info.all_exports and imp_name in info.all_exports: continue | |
| if imp_name in info.all_extend_names: continue | |
| # Skip star imports (they're counted separately) | |
| # Skip names that are also defined (shadow is already caught) | |
| if imp_name in info.functions or imp_name in info.classes: continue | |
| info_msgs.append(f"UNUSED IMPORT {info.rel}:{imp_line}: '{imp_name}' imported but never used") | |
| unused_import_count += 1 | |
| print(f" {unused_import_count} unused") | |
| # ── Phase 29: Context manager protocol completeness ── | |
| print("\nPhase 29: Context manager protocol (__enter__/__exit__)") | |
| ctx_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| has_enter = '__enter__' in ci.methods | |
| has_exit = '__exit__' in ci.methods | |
| if has_enter and not has_exit: | |
| errors.append(f"CTX INCOMPLETE {modules[mod_name].rel}:{ci.lineno}: " | |
| f"{cls_name} has __enter__ but no __exit__") | |
| ctx_issues += 1 | |
| elif has_exit and not has_enter: | |
| errors.append(f"CTX INCOMPLETE {modules[mod_name].rel}:{ci.lineno}: " | |
| f"{cls_name} has __exit__ but no __enter__") | |
| ctx_issues += 1 | |
| # Check __exit__ signature: should be (self, exc_type, exc_val, exc_tb) | |
| if has_exit: | |
| exit_node = ci.methods['__exit__'] | |
| n_args = len(exit_node.args.args) | |
| has_var = exit_node.args.vararg is not None | |
| has_kw = exit_node.args.kwarg is not None | |
| if not has_var and not has_kw and n_args != 4: | |
| warnings.append( | |
| f"CTX EXIT ARGS {modules[mod_name].rel}:{exit_node.lineno}: " | |
| f"{cls_name}.__exit__() has {n_args} params, expected 4 " | |
| f"(self, exc_type, exc_val, exc_tb)") | |
| ctx_issues += 1 | |
| print(f" {ctx_issues} issues") | |
| # ── Phase 30: Return value in __init__ ── | |
| print("\nPhase 30: Return value in __init__") | |
| init_return_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| if '__init__' not in ci.methods: continue | |
| init_node = ci.methods['__init__'] | |
| for sub in ast.walk(init_node): | |
| if sub is not init_node and isinstance(sub, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| continue # skip nested functions | |
| if isinstance(sub, ast.Return) and sub.value is not None: | |
| # Allow `return None` explicitly (some patterns use it) | |
| if isinstance(sub.value, ast.Constant) and sub.value.value is None: | |
| continue | |
| errors.append( | |
| f"INIT RETURN {modules[mod_name].rel}:{sub.lineno}: " | |
| f"{cls_name}.__init__() returns a value") | |
| init_return_issues += 1 | |
| print(f" {init_return_issues} issues") | |
| # ── Phase 31: Mutable class-level variables (shared across instances) ── | |
| print("\nPhase 31: Mutable class-level variables") | |
| mutable_cls_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| if ci.is_dataclass: continue # dataclass fields are fine | |
| for item in ci.node.body: | |
| if isinstance(item, ast.Assign): | |
| for target in item.targets: | |
| if isinstance(target, ast.Name) and not target.id.startswith('_'): | |
| # Skip ALL_CAPS names (constants by convention) | |
| if target.id == target.id.upper(): | |
| continue | |
| if isinstance(item.value, ast.List): | |
| warnings.append( | |
| f"MUTABLE CLS VAR {modules[mod_name].rel}:{item.lineno}: " | |
| f"{cls_name}.{target.id} = [] — shared across instances") | |
| mutable_cls_issues += 1 | |
| elif isinstance(item.value, ast.Dict): | |
| warnings.append( | |
| f"MUTABLE CLS VAR {modules[mod_name].rel}:{item.lineno}: " | |
| f"{cls_name}.{target.id} = {{}} — shared across instances") | |
| mutable_cls_issues += 1 | |
| elif isinstance(item.value, ast.Set): | |
| warnings.append( | |
| f"MUTABLE CLS VAR {modules[mod_name].rel}:{item.lineno}: " | |
| f"{cls_name}.{target.id} = set() — shared across instances") | |
| mutable_cls_issues += 1 | |
| print(f" {mutable_cls_issues} issues") | |
| # ── Phase 32: assert with tuple (always True) ── | |
| print("\nPhase 32: assert with tuple (always True)") | |
| assert_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Assert) and isinstance(node.test, ast.Tuple): | |
| errors.append( | |
| f"ASSERT TUPLE {info.rel}:{node.lineno}: " | |
| f"assert(<tuple>) is always True — use assert condition, 'message'") | |
| assert_issues += 1 | |
| print(f" {assert_issues} issues") | |
| # ── Phase 33: Type annotation references to undefined names ── | |
| print("\nPhase 33: Type annotation references") | |
| annot_issues = 0 | |
| # Collect all known type names from the package + builtins + typing | |
| typing_names = { | |
| 'Any', 'Union', 'Optional', 'List', 'Dict', 'Set', 'Tuple', 'Type', | |
| 'Callable', 'Iterator', 'Iterable', 'Generator', 'Sequence', 'Mapping', | |
| 'MutableMapping', 'MutableSequence', 'MutableSet', 'FrozenSet', | |
| 'ClassVar', 'Final', 'Literal', 'Protocol', 'TypeVar', 'Generic', | |
| 'Awaitable', 'Coroutine', 'AsyncIterator', 'AsyncIterable', | |
| 'ContextManager', 'AsyncContextManager', 'Pattern', 'Match', | |
| 'IO', 'TextIO', 'BinaryIO', 'NamedTuple', 'TypedDict', | |
| 'Counter', 'Deque', 'DefaultDict', 'OrderedDict', 'ChainMap', | |
| 'SupportsInt', 'SupportsFloat', 'SupportsComplex', 'SupportsBytes', | |
| 'SupportsAbs', 'SupportsRound', | |
| } | |
| builtin_types = {'int', 'str', 'float', 'bool', 'bytes', 'list', 'dict', | |
| 'set', 'tuple', 'type', 'None', 'object', 'complex', | |
| 'bytearray', 'memoryview', 'frozenset', 'range', 'slice', | |
| 'property', 'staticmethod', 'classmethod', 'super'} | |
| def _extract_annotation_names(ann_node) -> Set[str]: | |
| """Extract type names referenced in an annotation node.""" | |
| names = set() | |
| if isinstance(ann_node, ast.Name): | |
| names.add(ann_node.id) | |
| elif isinstance(ann_node, ast.Attribute): | |
| # e.g., typing.Optional — skip dotted | |
| pass | |
| elif isinstance(ann_node, ast.Subscript): | |
| names |= _extract_annotation_names(ann_node.value) | |
| if isinstance(ann_node.slice, ast.Tuple): | |
| for elt in ann_node.slice.elts: | |
| names |= _extract_annotation_names(elt) | |
| else: | |
| names |= _extract_annotation_names(ann_node.slice) | |
| elif isinstance(ann_node, ast.Constant) and isinstance(ann_node.value, str): | |
| # Forward reference: 'ClassName' | |
| names.add(ann_node.value) | |
| elif isinstance(ann_node, ast.BinOp) and isinstance(ann_node.op, ast.BitOr): | |
| # Python 3.10+ union: X | Y | |
| names |= _extract_annotation_names(ann_node.left) | |
| names |= _extract_annotation_names(ann_node.right) | |
| elif isinstance(ann_node, ast.Tuple): | |
| for elt in ann_node.elts: | |
| names |= _extract_annotation_names(elt) | |
| return names | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| available = (info.defined_names | typing_names | builtin_types | | |
| PYTHON_BUILTINS | {'Self', 'Never', 'TypeAlias', 'ParamSpec', | |
| 'Concatenate', 'TypeGuard', 'Unpack', | |
| 'Required', 'NotRequired'}) | |
| # Add names from star imports | |
| for star_mod, _ in info.imports_star: | |
| sp = resolve_relative_module(info.path, star_mod) | |
| if sp and sp in file_to_info: | |
| available |= file_to_info[sp].defined_names | |
| for ns, _ in file_to_info[sp].imports_star: | |
| np = resolve_relative_module(file_to_info[sp].path, ns) | |
| if np and np in file_to_info: | |
| available |= file_to_info[np].defined_names | |
| # Also collect names imported inside function bodies (lazy imports) | |
| lazy_imported: Set[str] = set() | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| for sub in ast.walk(node): | |
| if isinstance(sub, ast.ImportFrom) and sub.names: | |
| for alias in sub.names: | |
| lazy_imported.add(alias.asname or alias.name) | |
| elif isinstance(sub, ast.Import): | |
| for alias in sub.names: | |
| lazy_imported.add(alias.asname or alias.name.split('.')[0]) | |
| local_available = available | lazy_imported | |
| for node in ast.walk(info.tree): | |
| ann = None | |
| if isinstance(node, ast.AnnAssign) and node.annotation: | |
| ann = node.annotation | |
| elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| if node.returns: | |
| for name in _extract_annotation_names(node.returns): | |
| if name not in local_available and not name.startswith('_'): | |
| warnings.append( | |
| f"UNDEF TYPE {info.rel}:{node.lineno}: " | |
| f"return annotation references '{name}' — not in scope") | |
| annot_issues += 1 | |
| for arg in node.args.args + node.args.kwonlyargs: | |
| if arg.annotation: | |
| for name in _extract_annotation_names(arg.annotation): | |
| if name not in local_available and not name.startswith('_'): | |
| warnings.append( | |
| f"UNDEF TYPE {info.rel}:{arg.lineno}: " | |
| f"param '{arg.arg}' annotation references '{name}' — not in scope") | |
| annot_issues += 1 | |
| if ann: | |
| for name in _extract_annotation_names(ann): | |
| if name not in local_available and not name.startswith('_'): | |
| warnings.append( | |
| f"UNDEF TYPE {info.rel}:{node.lineno}: " | |
| f"annotation references '{name}' — not in scope") | |
| annot_issues += 1 | |
| print(f" {annot_issues} issues") | |
| # ── Phase 34: Bare raise outside except handler ── | |
| print("\nPhase 34: Bare raise outside except handler") | |
| bare_raise_issues = 0 | |
| def _check_bare_raise(stmts, in_except=False): | |
| count = 0 | |
| for stmt in stmts: | |
| if isinstance(stmt, ast.Raise) and stmt.exc is None and not in_except: | |
| errors.append(f"BARE RAISE {info.rel}:{stmt.lineno}: " | |
| f"bare 'raise' outside except handler") | |
| count += 1 | |
| if isinstance(stmt, ast.Try): | |
| count += _check_bare_raise(stmt.body, in_except) | |
| for handler in stmt.handlers: | |
| count += _check_bare_raise(handler.body, in_except=True) | |
| count += _check_bare_raise(stmt.orelse, in_except) | |
| if hasattr(stmt, 'finalbody'): | |
| count += _check_bare_raise(stmt.finalbody, in_except) | |
| elif isinstance(stmt, ast.If): | |
| count += _check_bare_raise(stmt.body, in_except) | |
| count += _check_bare_raise(stmt.orelse, in_except) | |
| elif isinstance(stmt, (ast.For, ast.While)): | |
| count += _check_bare_raise(stmt.body, in_except) | |
| count += _check_bare_raise(stmt.orelse, in_except) | |
| elif isinstance(stmt, ast.With): | |
| count += _check_bare_raise(stmt.body, in_except) | |
| elif isinstance(stmt, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| count += _check_bare_raise(stmt.body, False) # reset context | |
| elif isinstance(stmt, ast.ClassDef): | |
| count += _check_bare_raise(stmt.body, False) | |
| return count | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| bare_raise_issues += _check_bare_raise(info.tree.body) | |
| print(f" {bare_raise_issues} issues") | |
| # ── Phase 35: None comparison with == or != ── | |
| print("\nPhase 35: None comparison with == / !=") | |
| none_cmp_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Compare): | |
| for op, comparator in zip(node.ops, node.comparators): | |
| is_none = (isinstance(comparator, ast.Constant) and comparator.value is None) | |
| left_none = (isinstance(node.left, ast.Constant) and node.left.value is None) | |
| if (is_none or left_none) and isinstance(op, (ast.Eq, ast.NotEq)): | |
| warnings.append( | |
| f"NONE CMP {info.rel}:{node.lineno}: " | |
| f"use 'is None' / 'is not None' instead of '==' / '!='") | |
| none_cmp_issues += 1 | |
| print(f" {none_cmp_issues} issues") | |
| # ── Phase 36: Property consistency (setter without getter) ── | |
| print("\nPhase 36: Property setter/deleter without getter") | |
| prop_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| property_names: Set[str] = set() | |
| setter_names: Set[str] = set() | |
| deleter_names: Set[str] = set() | |
| for meth_name, meth_node in ci.methods.items(): | |
| for dec in meth_node.decorator_list: | |
| if isinstance(dec, ast.Name) and dec.id == 'property': | |
| property_names.add(meth_name) | |
| elif isinstance(dec, ast.Attribute): | |
| if dec.attr == 'setter': | |
| setter_names.add(meth_name) | |
| elif dec.attr == 'deleter': | |
| deleter_names.add(meth_name) | |
| for name in setter_names - property_names: | |
| errors.append(f"PROP ORPHAN {modules[mod_name].rel}: " | |
| f"{cls_name}.{name}.setter without @property getter") | |
| prop_issues += 1 | |
| for name in deleter_names - property_names: | |
| errors.append(f"PROP ORPHAN {modules[mod_name].rel}: " | |
| f"{cls_name}.{name}.deleter without @property getter") | |
| prop_issues += 1 | |
| print(f" {prop_issues} issues") | |
| # ── Phase 37: self.attr writes in methods that don't exist in __init__ (deep) ── | |
| # Different from Phase 9: this checks that methods writing NEW attrs (not in __init__) | |
| # don't create attrs that conflict with or shadow existing method names | |
| print("\nPhase 37: Attribute/method name conflicts") | |
| attr_conflict_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| method_names = set(ci.methods.keys()) | |
| # Attrs written in non-init methods that share names with methods | |
| for meth_name, meth_node in ci.methods.items(): | |
| if meth_name == '__init__': continue | |
| self_param = meth_node.args.args[0].arg if meth_node.args.args else None | |
| if self_param != 'self': continue | |
| for sub in ast.walk(meth_node): | |
| if isinstance(sub, ast.Assign): | |
| for t in sub.targets: | |
| if (isinstance(t, ast.Attribute) and | |
| isinstance(t.value, ast.Name) and | |
| t.value.id == 'self'): | |
| attr = t.attr | |
| if (attr in method_names and | |
| attr not in ci.instance_attrs and | |
| attr not in ci.class_attrs and | |
| not attr.startswith('_')): | |
| warnings.append( | |
| f"ATTR/METHOD CLASH {modules[mod_name].rel}:{sub.lineno}: " | |
| f"{cls_name}.{meth_name}() sets self.{attr} which shadows method {attr}()") | |
| attr_conflict_issues += 1 | |
| print(f" {attr_conflict_issues} issues") | |
| # ── Phase 38: Dynamic dispatch pattern verification ── | |
| print("\nPhase 38: Dynamic dispatch verification (getattr/f-string method lookup)") | |
| dispatch_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if not isinstance(node, ast.Call): continue | |
| if not isinstance(node.func, ast.Name): continue | |
| if node.func.id != 'getattr': continue | |
| if len(node.args) < 2: continue | |
| obj_arg = node.args[0] | |
| attr_arg = node.args[1] | |
| has_default = len(node.args) >= 3 # getattr(obj, name, default) | |
| # Only check self.getattr patterns | |
| if not (isinstance(obj_arg, ast.Name) and obj_arg.id == 'self'): | |
| continue | |
| if has_default: continue # has fallback, safe | |
| # Check if the attr is a constant string | |
| if isinstance(attr_arg, ast.Constant) and isinstance(attr_arg.value, str): | |
| attr_name = attr_arg.value | |
| # Find which class this is in | |
| for parent_node in ast.walk(info.tree): | |
| if isinstance(parent_node, ast.ClassDef): | |
| if any(isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)) and | |
| any(sub is node for sub in ast.walk(item)) | |
| for item in parent_node.body): | |
| cls_ci = info.classes.get(parent_node.name) | |
| if cls_ci and attr_name not in cls_ci.methods and attr_name not in cls_ci.instance_attrs: | |
| # Check ancestors | |
| found = False | |
| for bname in cls_ci.bases: | |
| if bname in global_classes: | |
| for _, pci in global_classes[bname]: | |
| if attr_name in pci.methods or attr_name in pci.instance_attrs: | |
| found = True; break | |
| if found: break | |
| if not found: | |
| warnings.append( | |
| f"DISPATCH MISS {info.rel}:{node.lineno}: " | |
| f"getattr(self, '{attr_name}') without default — " | |
| f"attribute not in class") | |
| dispatch_issues += 1 | |
| break | |
| # Check f-string patterns like getattr(self, f"_parse_{x}") | |
| elif isinstance(attr_arg, ast.JoinedStr): | |
| # Extract prefix from the f-string | |
| prefix = "" | |
| for val in attr_arg.values: | |
| if isinstance(val, ast.Constant): | |
| prefix += val.value | |
| else: | |
| break # stop at first variable part | |
| if prefix: | |
| # Check that at least some methods match the prefix | |
| for parent_node in ast.walk(info.tree): | |
| if isinstance(parent_node, ast.ClassDef): | |
| if any(isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)) and | |
| any(sub is node for sub in ast.walk(item)) | |
| for item in parent_node.body): | |
| cls_ci = info.classes.get(parent_node.name) | |
| if cls_ci: | |
| matching = [m for m in cls_ci.methods if m.startswith(prefix)] | |
| if not matching: | |
| warnings.append( | |
| f"DISPATCH EMPTY {info.rel}:{node.lineno}: " | |
| f"getattr(self, f'{prefix}...') — no methods match prefix '{prefix}'") | |
| dispatch_issues += 1 | |
| break | |
| print(f" {dispatch_issues} issues") | |
| # ── Phase 39: Except clause re-raises with 'raise e' instead of 'raise' ── | |
| print("\nPhase 39: Exception re-raise style (raise e vs raise)") | |
| reraise_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.ExceptHandler) and node.name: | |
| exc_var = node.name | |
| for sub in ast.walk(node): | |
| if (isinstance(sub, ast.Raise) and | |
| isinstance(sub.exc, ast.Name) and | |
| sub.exc.id == exc_var and | |
| sub.cause is None): | |
| # raise e without 'from' — loses traceback in some Python versions | |
| info_msgs.append( | |
| f"RERAISE {info.rel}:{sub.lineno}: " | |
| f"'raise {exc_var}' — consider bare 'raise' to preserve traceback") | |
| reraise_issues += 1 | |
| print(f" {reraise_issues} notices") | |
| # ── Phase 40: Global keyword usage ── | |
| print("\nPhase 40: Global keyword usage") | |
| global_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| if 'test' in mod_name: continue # tests can use global | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.Global): | |
| for name in node.names: | |
| info_msgs.append( | |
| f"GLOBAL VAR {info.rel}:{node.lineno}: 'global {name}' — consider alternatives") | |
| global_issues += 1 | |
| print(f" {global_issues} uses") | |
| # ── Phase 41: isinstance() checks against undefined/unimported types ── | |
| print("\nPhase 41: isinstance() against undefined types") | |
| isinstance_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| available = (info.defined_names | builtin_types | PYTHON_BUILTINS | | |
| {'NoneType', 'ModuleType', 'FunctionType', 'MethodType', | |
| 'TracebackType', 'CodeType', 'FrameType'}) | |
| for star_mod, _ in info.imports_star: | |
| sp = resolve_relative_module(info.path, star_mod) | |
| if sp and sp in file_to_info: | |
| available |= file_to_info[sp].defined_names | |
| for ns, _ in file_to_info[sp].imports_star: | |
| np = resolve_relative_module(file_to_info[sp].path, ns) | |
| if np and np in file_to_info: | |
| available |= file_to_info[np].defined_names | |
| for node in ast.walk(info.tree): | |
| if (isinstance(node, ast.Call) and | |
| isinstance(node.func, ast.Name) and | |
| node.func.id in ('isinstance', 'issubclass') and | |
| len(node.args) >= 2): | |
| type_arg = node.args[1] | |
| type_names = [] | |
| if isinstance(type_arg, ast.Name): | |
| type_names.append(type_arg.id) | |
| elif isinstance(type_arg, ast.Tuple): | |
| for elt in type_arg.elts: | |
| if isinstance(elt, ast.Name): | |
| type_names.append(elt.id) | |
| for tname in type_names: | |
| if tname not in available and not tname.startswith('_'): | |
| # Only flag names that look like type names (capitalized) | |
| # not variables like 'instruction_type', 'cls', 'target_type' | |
| if not tname[0].isupper(): | |
| continue # lowercase = variable, not a type literal | |
| errors.append( | |
| f"ISINSTANCE UNDEF {info.rel}:{node.lineno}: " | |
| f"{node.func.id}(..., {tname}) — '{tname}' not in scope") | |
| isinstance_issues += 1 | |
| print(f" {isinstance_issues} issues") | |
| # ── Phase 42: Missing __init__.py in subdirectories ── | |
| print("\nPhase 42: Missing __init__.py in package directories") | |
| missing_init = 0 | |
| for dirpath, dirnames, filenames in os.walk(PKG_ROOT): | |
| # Skip __pycache__ and hidden dirs | |
| dirnames[:] = [d for d in dirnames if not d.startswith(('.', '__pycache__'))] | |
| if any(f.endswith('.py') and f != '__init__.py' for f in filenames): | |
| init_path = Path(dirpath) / '__init__.py' | |
| if not init_path.exists() and dirpath != str(PKG_ROOT): | |
| rel = str(Path(dirpath).relative_to(PKG_ROOT.parent)) | |
| errors.append(f"MISSING __init__.py: {rel}/ has .py files but no __init__.py") | |
| missing_init += 1 | |
| print(f" {missing_init} issues") | |
| # ── Phase 43: Nested function/class depth ── | |
| print("\nPhase 43: Excessive nesting depth") | |
| nesting_issues = 0 | |
| def _check_nesting(node, depth=0, path=""): | |
| count = 0 | |
| for child in ast.iter_child_nodes(node): | |
| if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)): | |
| new_path = f"{path}.{child.name}" if path else child.name | |
| if depth >= 3: | |
| warnings.append( | |
| f"DEEP NESTING {info.rel}:{child.lineno}: " | |
| f"function '{new_path}' at nesting depth {depth}") | |
| count += 1 | |
| count += _check_nesting(child, depth + 1, new_path) | |
| elif isinstance(child, ast.ClassDef): | |
| new_path = f"{path}.{child.name}" if path else child.name | |
| count += _check_nesting(child, depth + 1, new_path) | |
| else: | |
| count += _check_nesting(child, depth, path) | |
| return count | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| nesting_issues += _check_nesting(info.tree) | |
| print(f" {nesting_issues} issues") | |
| # ── Phase 44: Inconsistent string quoting in same file (agent artifact) ── | |
| print("\nPhase 44: String quoting consistency") | |
| quote_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.source: continue | |
| # Count single vs double quotes for string literals | |
| single = len(re.findall(r"(?<!['\"])(?<!')(?<!\")(?<!\w)'[^']*'(?!')", info.source)) | |
| double = len(re.findall(r'(?<![\'"])(?<!\')(?<!")(?<!\w)"[^"]*"(?!")', info.source)) | |
| total = single + double | |
| if total > 20: | |
| minority_pct = min(single, double) / total * 100 | |
| if 30 < minority_pct < 50: # Genuinely mixed (not just a few outliers) | |
| info_msgs.append( | |
| f"MIXED QUOTES {info.rel}: {single} single-quoted vs {double} double-quoted " | |
| f"({minority_pct:.0f}% minority)") | |
| quote_issues += 1 | |
| print(f" {quote_issues} files with mixed quoting") | |
| # ── Phase 45: Duplicate string constants (copy-paste detection) ── | |
| print("\nPhase 45: Cross-module duplicate long strings (copy-paste artifacts)") | |
| long_strings: Dict[str, List[Tuple[str, int]]] = defaultdict(list) | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| if 'test' in mod_name: continue | |
| for node in ast.walk(info.tree): | |
| if (isinstance(node, ast.Constant) and isinstance(node.value, str) and | |
| len(node.value) > 80 and not node.value.startswith(('#', '/', 'http'))): | |
| # Normalize whitespace for comparison | |
| key = ' '.join(node.value.split()) | |
| long_strings[key].append((info.rel, node.lineno)) | |
| dup_str_count = 0 | |
| for s, locations in long_strings.items(): | |
| if len(locations) > 2: # Same long string in 3+ files | |
| files = [f"{f}:{l}" for f, l in locations[:5]] | |
| info_msgs.append( | |
| f"DUP STRING in {len(locations)} files: '{s[:60]}...' — {', '.join(files)}") | |
| dup_str_count += 1 | |
| print(f" {dup_str_count} duplicated strings") | |
| # ── Phase 46: Method argument name consistency with parent ── | |
| print("\nPhase 46: Method arg name consistency with parent") | |
| arg_name_issues = 0 | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| for base_name in ci.bases: | |
| if base_name not in global_classes: continue | |
| for _, parent_ci in global_classes[base_name]: | |
| for meth_name, meth_node in ci.methods.items(): | |
| if meth_name.startswith('_') and not meth_name.startswith('__'): continue | |
| if meth_name not in parent_ci.methods: continue | |
| parent_meth = parent_ci.methods[meth_name] | |
| # Compare arg names (not just counts) | |
| child_arg_names = [a.arg for a in meth_node.args.args] | |
| parent_arg_names = [a.arg for a in parent_meth.args.args] | |
| if len(child_arg_names) == len(parent_arg_names): | |
| for i, (cn, pn) in enumerate(zip(child_arg_names, parent_arg_names)): | |
| if i == 0: continue # self/cls | |
| if cn != pn: | |
| info_msgs.append( | |
| f"ARG RENAME {modules[mod_name].rel}:{meth_node.lineno}: " | |
| f"{cls_name}.{meth_name}() arg '{cn}' was '{pn}' " | |
| f"in {base_name}.{meth_name}()") | |
| arg_name_issues += 1 | |
| print(f" {arg_name_issues} renamed args") | |
| # ── Phase 47: Cross-file API contract verification ── | |
| # Check that if multiple classes have a method with the same name (duck-typing contract), | |
| # their signatures are compatible | |
| print("\nPhase 47: Duck-typing signature consistency") | |
| duck_issues = 0 | |
| # Collect all method signatures across all classes (excluding dunders and private) | |
| method_sigs: Dict[str, List[Tuple[str, str, int, bool, bool]]] = defaultdict(list) | |
| # name -> [(cls, file, n_args, has_var, has_kw)] | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| for meth_name, meth_node in ci.methods.items(): | |
| if meth_name.startswith('_'): continue | |
| args = meth_node.args | |
| n = len(args.args) - 1 # minus self | |
| method_sigs[meth_name].append(( | |
| cls_name, modules[mod_name].rel, n, | |
| args.vararg is not None, args.kwarg is not None | |
| )) | |
| for meth_name, sigs in method_sigs.items(): | |
| if len(sigs) < 3: continue # only check widely-used method names | |
| # Filter out classes with *args/**kwargs (flexible signatures) | |
| fixed_sigs = [(c, f, n) for c, f, n, v, k in sigs if not v and not k] | |
| if len(fixed_sigs) < 3: continue | |
| arg_counts = set(n for _, _, n in fixed_sigs) | |
| if len(arg_counts) > 1: | |
| details = [f"{c}({n} args)" for c, f, n in fixed_sigs[:5]] | |
| info_msgs.append( | |
| f"DUCK SIG MISMATCH: {meth_name}() has different arg counts: {', '.join(details)}") | |
| duck_issues += 1 | |
| print(f" {duck_issues} inconsistencies") | |
| # ── Phase 48: Verify __str__ consistency (every instruction should have __str__) ── | |
| print("\nPhase 48: Instruction __str__ coverage") | |
| str_issues = 0 | |
| instruction_classes = set() | |
| for cls_name, entries in global_classes.items(): | |
| for mod_name, ci in entries: | |
| if 'Instruction' in ci.bases or any( | |
| b in ci.bases for b in | |
| ('BuildKitInstruction', 'RunMount', 'CopyAdvanced', 'FromAdvanced', | |
| 'AddAdvanced', 'HealthcheckAdvanced', 'BuildKitDirective') | |
| ): | |
| instruction_classes.add(cls_name) | |
| if '__str__' not in ci.methods and '__repr__' not in ci.methods: | |
| # Check if parent has it | |
| has_in_parent = False | |
| queue = list(ci.bases) | |
| seen = set() | |
| while queue: | |
| b = queue.pop(0) | |
| if b in seen: continue | |
| seen.add(b) | |
| if b in global_classes: | |
| for _, pci in global_classes[b]: | |
| if '__str__' in pci.methods or '__repr__' in pci.methods: | |
| has_in_parent = True | |
| break | |
| queue.extend(pci.bases) | |
| if has_in_parent: break | |
| if not has_in_parent: | |
| info_msgs.append( | |
| f"NO __str__ {modules[mod_name].rel}: " | |
| f"instruction class '{cls_name}' has no __str__ or __repr__") | |
| str_issues += 1 | |
| print(f" {len(instruction_classes)} instruction classes | {str_issues} without __str__") | |
| # ── Phase 49: Verify all 'from .X import Y' targets exist in target's __all__ ── | |
| # (when target has __all__) | |
| print("\nPhase 49: Import vs __all__ cross-check") | |
| import_all_issues = 0 | |
| for mod_name, info in modules.items(): | |
| for imp_mod, imp_name, lineno, level in info.imports_from_pkg: | |
| target_path = resolve_relative_module(info.path, imp_mod, level) | |
| if target_path is None: continue | |
| ti = file_to_info.get(target_path) | |
| if ti is None: continue | |
| if ti.all_exports is None: continue # no __all__ = no restriction | |
| all_set = set(ti.all_exports) | set(ti.all_extend_names) | |
| if imp_name not in all_set and imp_name not in ('*',): | |
| # Only warn if target actually restricts with __all__ | |
| if all_set: | |
| info_msgs.append( | |
| f"NOT IN __all__ {info.rel}:{lineno}: " | |
| f"'{imp_name}' imported from {imp_mod} but not in its __all__") | |
| import_all_issues += 1 | |
| print(f" {import_all_issues} imports outside __all__") | |
| # ── Phase 50: Exception handler that catches and silently passes ── | |
| print("\nPhase 50: Silent exception swallowing") | |
| silent_issues = 0 | |
| for mod_name, info in modules.items(): | |
| if not info.tree: continue | |
| if 'test' in mod_name: continue | |
| for node in ast.walk(info.tree): | |
| if isinstance(node, ast.ExceptHandler): | |
| body = node.body | |
| if len(body) == 1 and isinstance(body[0], ast.Pass): | |
| # Check what's being caught | |
| if node.type: | |
| exc_name = "" | |
| if isinstance(node.type, ast.Name): | |
| exc_name = node.type.id | |
| elif isinstance(node.type, ast.Tuple): | |
| exc_name = "multiple" | |
| # Silently swallowing broad exceptions is concerning | |
| if exc_name in ('Exception', 'BaseException', 'multiple', ''): | |
| warnings.append( | |
| f"SILENT EXCEPT {info.rel}:{node.lineno}: " | |
| f"except {exc_name}: pass — silently swallows errors") | |
| silent_issues += 1 | |
| elif node.type is None: | |
| # Already caught by bare except check | |
| pass | |
| print(f" {silent_issues} silent handlers") | |
| # ═══════════════════════════════ SUMMARY ═══════════════════════════════ | |
| print("\n" + "=" * 72) | |
| print(" SUMMARY") | |
| print("=" * 72) | |
| if errors: | |
| print(f"\n ERRORS ({len(errors)}):") | |
| for e in errors: | |
| print(f" [ERR] {e}") | |
| if warnings: | |
| print(f"\n WARNINGS ({len(warnings)}):") | |
| for w in warnings: | |
| print(f" [WARN] {w}") | |
| if info_msgs: | |
| print(f"\n INFO ({len(info_msgs)}):") | |
| for i in info_msgs: | |
| print(f" [INFO] {i}") | |
| if not errors and not warnings and not info_msgs: | |
| print("\n ALL CHECKS PASSED — no issues found!") | |
| print(f"\n Total: {len(errors)} errors, {len(warnings)} warnings, {len(info_msgs)} info") | |
| print("=" * 72) | |
| return len(errors) | |
| if __name__ == '__main__': | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment