Consolidated: March 25, 2026 Original dates: January 7-8, 2026 Sources: 10 strategic/architecture gists consolidated into one Contents: Master Synthesis, Risk Analysis, ggen Comparison, Code Archaeology, Naming Schemes, Use Cases, Enhancement Plan, System Context, Features Roadmap, Oracle Handoff
- Master Synthesis - GPT-5.2-pro analysis of 27 research gists
- Oracle/ChatGPT Handoff - Autonomous software company + ggen internal rigor
- Risk Deep Dive - 6 risks analyzed with failure modes and mitigations
- Architecture Comparison - BLACKICE vs ggen Thesis (18 discovered components)
- Enhancement Plan - Enhanced with ggen Principles
- Code Archaeology - What ChatGPT Missed (18 production-ready components)
- Use Cases - Regulated code gen, CI/CD, cost tracking, compliance
- System Context Drop - 54K+ lines, 72 features, 19 sources
- Features Roadmap - Ultimate roadmap from 19 project analyses
- Naming Schemes - 3 options for repo + 8 primitives
Original gist: 183f236ab723563f546c72d72860c3e6
BLACKICE Master Synthesis: GPT-5.2-pro analysis of 27 research gists - Unified vision + Build order + Conflicts resolved
Source: GPT-5.2-pro analysis of 27 research gists Date: January 8, 2026
Unified Vision: BLACKICE is an autonomous software company — user gives vision, system works until it ships working code. All complexity is internal.
Build Order: Phase 1 (foundation) → Phase 2 (safety) → Receipts → Specs → Intelligence → Polish
Across all 27 documents, the vision is consistent:
BLACKICE is an autonomous software company: the user gives a natural-language "vision" (build X), and the system works until it ships working code—planning, implementing, testing, fixing failures, and delivering a repo.
The engine is a Ralph Loop ("try → fail → reflect → learn → retry") plus multi-agent consensus, plus hard guardrails for budget/safety, and persistent state for recovery.
Keep the UX the same, but add spec/validation/receipts internally (inspired by ggen's spec-first determinism):
- Fewer wasted tokens (validate earlier)
- Dependency-correct scheduling
- Compliance/auditability
- Reproducibility/debuggability via receipts
| # | Item | Source | Why First |
|---|---|---|---|
| 1 | Provider Registry | ClaudeBar | Everything else depends on it |
| 2 | Per-project config cascade | Superset | Can't scale without repo-specific constraints |
| 3 | blackice doctor |
ACFS | Reduces "toolchain missing" failures |
| 4 | Status notifications | Superset | Preserves UX while reducing anxiety |
| 5 | Completion marker detection | Ralph Orchestrator | The control loop's "stop condition" |
| 6 | Continuation enforcement | Oh-My-OpenCode | Eliminates "agent quit early" failures |
| 7 | Forced attention recovery | Planning-with-Files | Prevents long-run drift |
| 8 | Conditional execution + concurrency limits | Petit | Robust workflows, no runaway resources |
| 9 | Fail-safe defaults + security masking | Safety-Net | Safe even if misconfigured |
Phase 1 Exit Gate: doctor passes fresh install; config loads; completion markers detected; status notifications working.
| # | Item | Source |
|---|---|---|
| 10 | Command safety pipeline (5-stage) | Auto-Claude + Safety-Net |
| 11 | Self-validating QA loop | Auto-Claude |
| 12 | Git hooks + CI mode + caching | Guardian Angel |
| 13 | Unified quality scoring | Wayfound + Quint |
| 14 | Pre-execution guidelines query | Wayfound |
Phase 2 Exit Gate: "Production-ready safety layer with quality-gated execution."
| # | Item | Notes |
|---|---|---|
| 15 | Receipt store v1 | Hash input/output + provenance chain |
| 16 | Spec layer v0 | Start JSON/Pydantic, SHACL later |
| 17 | Dependency ordering v0 | Topological sort first, SPARQL later |
| # | Item | Source |
|---|---|---|
| 18 | Continuity ledger + handoffs | Continuous-Claude |
| 19 | Artifact index (SQLite FTS5) | Roadmap |
| 20 | Q-cycle reasoning + decision docs | Quint-Code |
| 21 | SOP generation + task extraction | Acontext |
| 22 | Cascading verification + proactive spawning | Claude-Workflow |
| # | Item | Source |
|---|---|---|
| 23 | Convoys / work bundling | Gas Town + MassGen |
| 24 | OpenAI-compatible API wrapper | MassGen |
| 25 | Manifest-driven agent registry | ACFS |
| 26 | Built-in diff viewer | Superset |
| 27 | Async human-in-the-loop (optional) | Plannotator |
The consolidated roadmap resolved these contradictions:
| Conflict | Sources | Resolution |
|---|---|---|
| State management | Event store vs ledgers vs files | Layered: Beads (immutable) + scratchpads + workspaces + snapshots |
| Quality eval | Binary vs grades vs confidence | Unified: raw score + letter grade + confidence + breakdown |
| Memory/learning | Events vs semantic vs insights vs SOPs | 4-layer: SOP store + insights DB + Letta semantic + Beads log |
| Command safety | Static vs dynamic vs semantic vs sandbox | 5-stage pipeline: unwrap → parse → allowlist → policy → sandbox |
| Agent coordination | Consensus vs spawning vs patrol vs handoffs | Unified lifecycle manager |
| Configuration | Per-project vs rules vs manifest | 5-level cascade: defaults → user → project → rules → manifest |
| Model routing | Capability vs role vs parallel | Enhanced router: role/capability/parallel/auto + self-registration |
- What "done" means for SaaS vs CLI vs library
- Required artifacts (docs, tests, deploy scripts)
- Acceptance checks the system can run autonomously
- Fixed suite of benchmark "visions"
- Replayable runs
- Tracked metrics (cost/time/success)
- Regression gating on improvements
- Dependency policy (pinning, lockfiles)
- Secrets scanning + injection patterns
- SAST/dependency vulnerability scanning
- SBOM generation
- Network egress policies
- Runnable starter (one command)
- Environment bootstrap
- Deploy path
- Clear README for what was built
- Start JSON/Pydantic schemas
- SHACL/RDF only for enterprise mode
- SPARQL optional until graphs outgrow topo-sort
- Provider Registry
- Per-project config cascade
blackice doctor- Status notifications
- Completion markers
- Continuation enforcement
- Forced attention recovery
- Conditional execution + concurrency limits
Then immediately: Phase 2 safety pipeline + QA loop
This makes "vision → software" feel reliable because the system stops drifting, stops quitting early, and stops failing for boring environment reasons.
| # | Document | Gist |
|---|---|---|
| 1 | Oracle Handoff | f2a484c2ef0be80c3e611a3f05455215 |
| 2 | System Documentation | 9569ccc3aa932d75f19d702b9d945f4c |
| 3 | Ultimate Features Roadmap | c20aa4f397cade28d885902d6b58aef7 |
| 4 | Risk Deep Dive | 6a69c866da5089828dee823b07b0910b |
| 5 | Auto-Claude Ideas | 3fe6e9c14fbaab1a04ac6c04e9b12cc8 |
| 6 | Oh-My-OpenCode Ideas | 4442ce070009cc6674820a517b64a8a3 |
| 7 | Architecture Comparison | a36334c63186f70925e37e3e285ae66d |
| 8 | Use Cases | f92f5648c958c604c514f26d3ad4f1fd |
Turn Phase 1 into an executable engineering sprint plan (tickets + acceptance criteria + integration points), starting with:
- Provider Registry
- Config cascade
blackice doctor- Completion markers
- Continuation enforcement
Master synthesis by GPT-5.2-pro via Oracle, January 8, 2026
Original gist: f2a484c2ef0be80c3e611a3f05455215
BLACKICE 2.0 Oracle/ChatGPT Handoff - Autonomous software company + ggen internal rigor
For: Oracle/ChatGPT review From: Claude Code archaeology session Date: January 7, 2026
BLACKICE is an autonomous software company — you give it a vision ("build me a SaaS"), it works until it's done.
We discovered 18 major components in the codebase that weren't documented, then compared it to the ggen PhD thesis on specification-first code generation.
Proposal: Enhance BLACKICE with ggen's internal rigor (specs, validation, audit trails) while keeping the same UX: "tell me your vision → get working software."
An autonomous AI software company with ~54,000 lines of Python:
User: "Build me a restaurant reservation SaaS with Square payments"
BLACKICE: *works autonomously for hours/days*
- Plans the architecture
- Generates code
- Tests it
- Fixes failures (Reflexion loop)
- Learns from mistakes (Letta memory)
- Retries until success
BLACKICE: "Done. Here's your repo."
| # | Component | Purpose |
|---|---|---|
| 1 | Company Operations | GitHub/deployment automation |
| 2 | Cancellation Tokens | 7 reasons, 3 modes, propagation |
| 3 | Resource Scheduler | Memory/CPU/GPU constraints |
| 4 | Agent Mail Protocol | Inter-agent messaging |
| 5 | Git Checkpoint Manager | Rollback, 5 triggers |
| 6 | Cloud Storage | S3/GCS/Azure/Local backends |
| 7 | Artifact Store | Build output tracking |
| 8 | Semantic Memory | Embeddings + model tracking |
| 9 | Design Patterns | Strategy, Chain, Builder, Factory, Decorator |
| 10 | Memory Store | Letta 0.16+ Archives API |
| 11 | Reflexion Loop | Self-improving execution |
| 12 | Models + State Machine | 40+ event types |
| 13 | Validator Framework | Pluggable validation |
| 14 | Orchestrator | Multi-agent coordination |
| 15 | OpenTelemetry Tracer | Distributed tracing |
| 16 | Prometheus Metrics | Full observability |
| 17 | Retry Engine | Exponential backoff |
| 18 | Agent Registry | Capability-based routing |
Title: "Specification-First Code Generation at Enterprise Scale"
Core Idea: The Chatman Equation: A = μ(O)
- A = Generated artifacts
- μ = Measurement function (code generator)
- O = Ontological specification (RDF)
Key Features:
- RDF specifications (formal task schemas)
- SHACL validation (pre-execution checks)
- SPARQL queries (dependency ordering)
- blake3 receipts (cryptographic audit trail)
- Deterministic: same spec → same code
| Dimension | ggen | BLACKICE |
|---|---|---|
| Paradigm | Specification-first (deterministic) | Vision-first (adaptive) |
| Input | Formal RDF specs | Natural language |
| Guarantees | Mathematical (hash-verified) | Statistical (learning) |
| Memory | Stateless | Letta (cross-session) |
| Strengths | Reproducibility, compliance | Autonomy, adaptation |
Add ggen's rigor INTERNALLY without changing user experience.
User: "Build me X"
BLACKICE: *works* → "Here's X"
Vision (natural language)
↓
AUTO-GENERATE specs (LLM translates vision to internal specs)
↓
SHACL validates (catch problems before burning tokens)
↓
SPARQL orders (build dependencies correctly)
↓
Execute with Reflexion (existing self-improvement)
↓
Log receipts (silent audit trail)
↓
Loop until vision achieved
| Benefit | How |
|---|---|
| Fewer wasted tokens | Validate before execute |
| Smarter ordering | Dependency-aware scheduling |
| Compliance-ready | Automatic audit trail |
| Reproducible | Hash-verified outputs |
| Debuggable | Receipt chain for failures |
┌─────────────────────────────────────────────────────────────────────────────┐
│ BLACKICE 2.0 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ USER INPUT: Natural language vision │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ NEW: Specification Layer (from ggen) - INTERNAL/INVISIBLE │ │
│ │ Vision → Auto-Specs → SHACL Validate → SPARQL Dependencies │ │
│ └─────────────────────────────────────────┬───────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ EXISTING: Execution Layer │ │
│ │ SafetyGuard → LLMRouter → DAGExecutor → Reflexion → Letta │ │
│ └─────────────────────────────────────────┬───────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ NEW: Verification Layer (from ggen) - INTERNAL/INVISIBLE │ │
│ │ Canonicalize → blake3 Hash → Receipt Store (audit trail) │ │
│ └─────────────────────────────────────────┬───────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ EXISTING: Memory & Recovery Layer │ │
│ │ LettaAdapter → Beads → RecoveryManager → DeadLetterQueue │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ OUTPUT: Working software │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
- Regulated industries (healthcare, finance) — audit trail proves compliance
- Multi-team usage — catch bad tasks before wasting tokens
- CI/CD integration — security constraint enforcement
- Cost tracking — receipt-based attribution
- Failure debugging — receipt chain shows what went wrong
- Reproducibility — hash-verified outputs for research
User experience must remain: "Give vision, get software"
All spec/validation/receipt stuff is INTERNAL. User never writes RDF, never learns SHACL, never touches SPARQL. The system auto-generates all of that from their natural language vision.
-
Does this hybrid approach make sense? (ggen rigor + BLACKICE autonomy)
-
What's missing? We have 18 discovered components + proposed spec layer. Gaps?
-
Implementation priority? What should be built first?
-
Alternative approaches? Is there a simpler way to get audit trails + validation without full RDF/SHACL?
-
Risk assessment? What could go wrong with this approach?
| Document | URL |
|---|---|
| Archaeology Comparison | https://gist.github.com/jmanhype/a36334c63186f70925e37e3e285ae66d |
| Enhancement Plan | https://gist.github.com/jmanhype/303c716fa9cc17c1733aedb1758362e5 |
| Use Cases | https://gist.github.com/jmanhype/f92f5648c958c604c514f26d3ad4f1fd |
- BLACKICE:
/Users/speed/proxmox/blackice/integrations/ralph/(~54K lines Python) - ggen thesis:
github.com/seanchatmangpt/ggen/tree/master/thesis
Handoff prepared by Claude Code on January 7, 2026
Original gist: 6a69c866da5089828dee823b07b0910b
BLACKICE 2.0 Risk Analysis: Deep dive on 6 risks from Oracle/GPT-5.2-pro review
Each HIGH and MEDIUM risk analyzed with examples, failure modes, and mitigations
Risk Level: 🔴 HIGH
BLACKICE 2.0 needs to convert natural language visions into formal specifications:
User: "Build me a restaurant reservation SaaS"
↓
System must generate:
↓
task:restaurant-saas a task:Project ;
task:hasDescription "Restaurant reservation SaaS" ;
task:requiresAuth task:OAuth2 ;
task:requiresDatabase task:PostgreSQL ;
task:requiresPayment task:StripeIntegration ;
task:requiresDeployment task:KubernetesCluster ;
...50 more inferred requirements...
The gap: User said 12 words. System must infer 50+ technical decisions.
User: "Build me a blog"
System generates:
- task:hasDescription "A blog"
- task:targetLanguage "python"
- (nothing else)
Result: SHACL validation PASSES (minimal spec is "valid")
But execution produces garbage (no auth, no database, no frontend)
User: "Build me a simple landing page"
System generates:
- task:requiresAuth task:OAuth2
- task:requiresDatabase task:PostgreSQL
- task:requiresPayment task:Stripe
- task:requiresKubernetes true
Result: System builds a $50K enterprise app for a $500 landing page
Burns 500K tokens on unnecessary complexity
User: "Build me a mobile app for iOS"
System generates:
- task:targetLanguage "python"
- task:framework "Django"
Result: Builds a web app, not iOS app
User gets wrong deliverable entirely
User: "Build a real-time chat app, keep it simple, must scale to millions"
System generates:
- task:architecture "monolith" (from "keep it simple")
- task:scalability "horizontal" (from "scale to millions")
- task:database "SQLite" (from "simple")
- task:expectedUsers 10000000
Result: SHACL validation fails on conflicts
Or worse: passes but produces impossible architecture
| Challenge | Why |
|---|---|
| Ambiguity | "Simple" means different things to different people |
| Implicit knowledge | "SaaS" implies auth, billing, multi-tenancy — but user didn't say that |
| Domain expertise | System must know "restaurant reservation" needs table management, time slots, no-show policies |
| Scope creep | Where does "restaurant SaaS" end? Inventory? Staff scheduling? Analytics? |
class SpecGenerator:
async def generate(self, vision: str) -> tuple[TaskSpec, float]:
spec = await self.llm_generate(vision)
confidence = await self.score_confidence(vision, spec)
if confidence < 0.7:
# Ask ONE clarifying question
clarification = await self.generate_clarification(vision, spec)
return spec, confidence, clarification
return spec, confidence, None
# Example:
spec, conf, question = await gen.generate("Build me a blog")
# conf = 0.4
# question = "Should this blog support multiple authors, comments, or be a simple personal blog?"DOMAIN_TEMPLATES = {
"saas": {
"required": ["auth", "billing", "multi_tenancy"],
"common": ["admin_dashboard", "api", "webhooks"],
"optional": ["analytics", "audit_logs"]
},
"landing_page": {
"required": ["responsive_design"],
"common": ["contact_form", "analytics"],
"optional": ["cms"]
},
"mobile_app": {
"required": ["target_platform"], # iOS, Android, both
"common": ["push_notifications", "offline_support"],
"optional": ["in_app_purchases"]
}
}
# Detect domain, apply template, fill gapsAttempt 1: Generate minimal spec from vision
→ Execute → Fails (missing database)
Attempt 2: Add database to spec based on failure
→ Execute → Fails (missing auth)
Attempt 3: Add auth to spec based on failure
→ Execute → Success
# Spec evolves with execution, not just at start
# Store spec versions in receipts for debugging
class SHACLValidator:
def validate(self, spec: Graph, mode: str = "strict") -> ValidationResult:
if mode == "strict":
# All shapes must pass
return self._strict_validate(spec)
elif mode == "permissive":
# Warn on missing optional fields
# Only fail on critical missing fields
return self._permissive_validate(spec)
elif mode == "learning":
# Log all issues but never block
# Use for initial spec generator training
return self._learning_validate(spec)| Metric | Target | Alert If |
|---|---|---|
| Spec generation confidence | >0.7 avg | <0.5 on any task |
| Clarification questions asked | <2 per vision | >3 per vision |
| Spec-related failures | <10% of runs | >25% of runs |
| Spec revision count | <3 per task | >5 per task |
Risk Level: 🔴 HIGH
Receipts prove what happened, not that it was correct.
Receipt:
{
"spec_hash": "abc123",
"output_hash": "def456",
"status": "success",
"model": "claude-sonnet-4-20250514"
}
Auditor: "Great, you have receipts. But is the code actually HIPAA compliant?"
You: "Uh... the receipt says success?"
Auditor: "That's not what I asked."
Task: Generate HIPAA-compliant patient API
Result: Code runs without errors
Receipt: status = "success"
Reality:
- No encryption at rest
- No audit logging
- PHI exposed in error messages
- Technically "successful" but completely non-compliant
Task: Generate payment processing
Result: All 47 generated tests pass
Receipt: status = "success", tests_passed = 47
Reality:
- Tests only check happy path
- No edge cases (refunds, disputes, failures)
- Code charges customers twice on retry
- "100% test pass rate" is meaningless
Auditor: "Can you prove this code hasn't been tampered with?"
You: "Yes! blake3(output) = def456, matches receipt"
Auditor: "Can you prove it doesn't have SQL injection?"
You: "...no, that's not what the hash proves"
Management: "We have cryptographic audit trails!"
Reality: Audit trails prove code was generated, not that it's compliant
SOC2 Auditor: "Show me evidence of access controls"
You: *shows receipt with output_hash*
Auditor: "This proves nothing about access controls"
| Stakeholder | False Belief | Reality |
|---|---|---|
| Management | "We're compliant because we have receipts" | Receipts ≠ compliance |
| Developers | "If it passed, it's good" | "Passed" = no crash, not "correct" |
| Auditors | "Hash chain = secure" | Hash proves integrity, not security |
| Legal | "We can prove what happened" | Yes, but not that it was right |
@dataclass
class EnhancedReceipt:
# Existing fields
spec_hash: str
output_hash: str
status: str
# NEW: Verification results (not just "success/fail")
verification_results: dict = field(default_factory=dict)
# Example verification_results:
# {
# "unit_tests": {"passed": 47, "failed": 0, "coverage": 0.82},
# "security_scan": {"critical": 0, "high": 2, "medium": 5},
# "lint": {"errors": 0, "warnings": 12},
# "type_check": {"errors": 0},
# "hipaa_checklist": {"passed": 14, "failed": 2, "na": 4},
# "dependency_audit": {"vulnerabilities": 0}
# }# In task spec, define compliance requirements
task:patient-api a task:CodeGenTask ;
task:complianceRequirements [
task:requireEncryptionAtRest true ;
task:requireAuditLogging true ;
task:requireAccessControl true ;
task:requirePHIRedaction true ;
task:maxSecurityVulnerabilities 0 ;
task:minTestCoverage 0.80
] .# Validator checks compliance requirements
class ComplianceValidator:
async def validate(self, output: CodeOutput, requirements: ComplianceReqs) -> ComplianceResult:
results = {}
if requirements.require_encryption_at_rest:
results["encryption"] = await self.check_encryption(output)
if requirements.require_audit_logging:
results["audit_logging"] = await self.check_audit_logging(output)
if requirements.max_security_vulnerabilities is not None:
scan = await self.run_security_scan(output)
results["security"] = scan.critical_count <= requirements.max_security_vulnerabilities
return ComplianceResult(
compliant=all(results.values()),
details=results
)class TaskResult:
# Execution status (did it crash?)
execution_status: Literal["success", "failed", "timeout", "cancelled"]
# Quality status (is it good?)
quality_status: Literal["verified", "unverified", "failed_verification"]
# Compliance status (is it compliant?)
compliance_status: Literal["compliant", "non_compliant", "not_checked", "partially_compliant"]
# Only mark truly "done" if all three pass
@property
def is_complete(self) -> bool:
return (
self.execution_status == "success" and
self.quality_status == "verified" and
self.compliance_status == "compliant"
){
"receipt_id": "abc123",
"spec_hash": "...",
"output_hash": "...",
"execution_status": "success",
"verification_evidence": {
"tests": {
"runner": "pytest",
"version": "8.0.0",
"passed": 47,
"failed": 0,
"skipped": 2,
"coverage": 0.82,
"report_hash": "..."
},
"security_scan": {
"tool": "bandit",
"version": "1.7.0",
"findings": [],
"report_hash": "..."
},
"compliance_checks": {
"framework": "HIPAA",
"checklist_version": "2024.1",
"passed": ["encryption", "audit_logging", "access_control"],
"failed": [],
"evidence_hashes": {"encryption": "...", "audit_logging": "..."}
}
}
}| Auditor Question | Receipt Alone | Receipt + Verification |
|---|---|---|
| "Was code generated?" | ✅ Yes | ✅ Yes |
| "By what model?" | ✅ Yes | ✅ Yes |
| "Is it tamper-proof?" | ✅ Hash proves it | ✅ Hash proves it |
| "Does it have tests?" | ❌ No idea | ✅ Test results in receipt |
| "Is it secure?" | ❌ No idea | ✅ Scan results in receipt |
| "Is it HIPAA compliant?" | ❌ No idea | ✅ Checklist results in receipt |
Risk Level: 🔴 HIGH
BLACKICE's value = "give vision, get software"
If SHACL is too strict:
User: "Build me a quick prototype"
SHACL: ❌ REJECTED - Missing required field: task:securityModel
SHACL: ❌ REJECTED - Missing required field: task:scalabilityTarget
SHACL: ❌ REJECTED - Missing required field: task:complianceFramework
SHACL: ❌ REJECTED - Missing required field: task:disasterRecoveryPlan
User: "I just wanted a prototype! This is worse than Jira!"
Vision: "Simple todo app"
Validation errors:
1. Missing authentication strategy
2. Missing database selection
3. Missing deployment target
4. Missing test coverage target
5. Missing documentation requirements
6. Missing accessibility requirements
7. Missing internationalization requirements
8. Missing performance benchmarks
9. Missing security scan requirements
10. Missing compliance framework
...
User: *closes BLACKICE, opens cursor*
# Shapes designed for enterprise use cases
task:TaskShape a sh:NodeShape ;
sh:property [
sh:path task:costCenter ;
sh:minCount 1 ; # Required for enterprise billing
] ;
sh:property [
sh:path task:projectCode ;
sh:minCount 1 ; # Required for enterprise tracking
] ;
sh:property [
sh:path task:approvalChain ;
sh:minCount 1 ; # Required for enterprise governance
] .
# Solo developer trying to build a side project:
# "Why do I need a cost center for my hobby app?"
# Chicken-and-egg problem:
User: "Build me an API"
SHACL: "What endpoints?"
User: "I don't know yet, that's what I want you to figure out"
SHACL: "Can't validate without endpoints specified"
User: "But I'm asking you to design them"
SHACL: "Invalid spec. Rejected."
# Shape requires PostgreSQL for "production" tasks
task:ProductionTaskShape a sh:NodeShape ;
sh:property [
sh:path task:database ;
sh:hasValue task:PostgreSQL ;
sh:message "Production tasks must use PostgreSQL"
] .
# User wants to deploy to Cloudflare Workers (no PostgreSQL)
# Valid architecture, but SHACL rejects it
| Strict Validation | User Experience |
|---|---|
| Every field required | "This is more work than coding it myself" |
| No flexibility | "I can't experiment or prototype" |
| Enterprise-only shapes | "This isn't for me" |
| Blocks on ambiguity | "I don't know the answer yet" |
class ValidationMode(Enum):
PROTOTYPE = "prototype" # Minimal validation, maximum flexibility
DEVELOPMENT = "development" # Moderate validation, some flexibility
PRODUCTION = "production" # Strict validation, enterprise requirements
REGULATED = "regulated" # Maximum validation, compliance requirements
class Validator:
def validate(self, spec: TaskSpec, mode: ValidationMode) -> ValidationResult:
shapes = self.get_shapes_for_mode(mode)
return self.run_validation(spec, shapes)
# User can say: "Build me a prototype" → PROTOTYPE mode
# Or: "Build me a HIPAA-compliant patient portal" → REGULATED modeclass ValidationSeverity(Enum):
INFO = "info" # Log it, don't show user
WARNING = "warning" # Show user, don't block
ERROR = "error" # Block in strict mode, warn in permissive
FATAL = "fatal" # Always block (security issues, impossible specs)
# Example shape with severity
task:AuthShape a sh:NodeShape ;
sh:property [
sh:path task:authStrategy ;
sh:minCount 1 ;
sh:severity sh:Warning ; # Warn, don't block
sh:message "No auth strategy specified - will default to none"
] .class SpecEnricher:
"""Fill gaps with sensible defaults instead of rejecting."""
DEFAULTS = {
"prototype": {
"database": "sqlite",
"auth": "none",
"deployment": "local",
"tests": "minimal"
},
"production": {
"database": "postgresql",
"auth": "oauth2",
"deployment": "kubernetes",
"tests": "comprehensive"
}
}
def enrich(self, spec: TaskSpec, mode: str) -> TaskSpec:
defaults = self.DEFAULTS.get(mode, self.DEFAULTS["prototype"])
for field, default in defaults.items():
if not getattr(spec, field, None):
setattr(spec, field, default)
spec.add_note(f"Defaulted {field} to {default}")
return specclass ProgressiveValidator:
"""Validate incrementally as task progresses."""
async def validate_at_stage(self, spec: TaskSpec, stage: str) -> ValidationResult:
if stage == "planning":
# Only check: does this make sense?
return self.validate_minimal(spec)
elif stage == "architecture":
# Check: are major decisions made?
return self.validate_architecture(spec)
elif stage == "implementation":
# Check: are implementation details complete?
return self.validate_implementation(spec)
elif stage == "deployment":
# Check: is it production-ready?
return self.validate_production(spec)
# Don't require deployment config at planning stage
# Don't require architecture at idea stageUser: "Build me a todo app"
BLACKICE: "Quick question - what level of rigor?
[1] Prototype (fastest, minimal validation)
[2] Side project (some validation)
[3] Production (full validation)
[4] Enterprise (compliance-ready)"
User: "1"
BLACKICE: "Got it, prototype mode. Skipping enterprise validations."
┌─────────────────────────────────────────────────────────────────┐
│ VALIDATION SPECTRUM │
├─────────────────────────────────────────────────────────────────┤
│ │
│ TOO LOOSE TOO STRICT
│ ────────────────────────────────────────────────────────────────
│ │ │
│ │ "Anything goes" "Sensible defaults" "Jira++" │
│ │ (no value) (SWEET SPOT) (no users) │
│ │ │
│ ▼ ▲ ▼
│ Garbage output │ Nobody uses it
│ No audit trail │ "Too much friction"
│ Can't debug │ Users go elsewhere
│ │ │
│ TARGET HERE │
│ │
└─────────────────────────────────────────────────────────────────┘
Risk Level: 🔴 HIGH
You're building compliance-ready audit trails for regulated industries (HIPAA, SOC2).
But if your audit trail contains secrets/PII, you've created a new compliance violation.
Receipt store:
{
"task_id": "patient-api-001",
"input_hash": "abc123",
"input_content": "Generate API for patient John Smith, SSN 123-45-6789,
diagnosed with HIV on 2024-01-15, prescribed..."
// You just stored PHI in your audit log
// You are now non-compliant with HIPAA
// Congratulations, you played yourself
}
User: "Connect to database at postgres://admin:SuperSecret123@prod.db.com/patients"
System stores in receipt:
input_hash: "..."
input_content: "Connect to database at postgres://admin:SuperSecret123@..."
Attacker gets receipts → gets database credentials
Task: Generate Stripe integration
Generated code:
stripe.api_key = "sk_live_abc123xyz..."
Receipt stores:
output_hash: "..."
output_content: "<full code with API key>"
Receipt store is now a credential dump
Task: Generate email template for customer
Prompt to LLM:
"Generate welcome email for John Smith (john@example.com,
phone: 555-1234, address: 123 Main St)"
Receipt stores:
prompt_hash: "..."
prompt_content: "<full prompt with PII>"
You now have a PII database disguised as an audit log
Task fails with error:
"Authentication failed for user admin@company.com with password 'hunter2'"
Receipt stores:
error_message: "Authentication failed for user admin@company.com..."
Error logs become credential leaks
Letta memory includes:
"User previously asked about AWS account 123456789012"
"User's SSH key is: -----BEGIN RSA PRIVATE KEY-----..."
Memory hash includes reference to this
Receipt links to memory state
Memory is now attack surface
| Scenario | Consequence |
|---|---|
| Receipts leaked | All secrets in all tasks exposed |
| Receipts subpoenaed | Legal discovery reveals customer PII |
| Receipts hacked | Single breach exposes everything |
| Receipts audited | Auditor sees you're storing secrets |
| Employee access | Anyone with receipt access sees secrets |
class SecureReceiptStore:
def __init__(self, mode: str = "hash_only"):
self.mode = mode
def store(self, receipt: Receipt) -> str:
if self.mode == "hash_only":
# ONLY store hashes, never content
secure_receipt = Receipt(
spec_hash=receipt.spec_hash,
input_hash=self._hash(receipt.input_content), # Hash only
output_hash=self._hash(receipt.output_content), # Hash only
prompt_hash=self._hash(receipt.prompt_content), # Hash only
# Content fields are NOT stored
)
return self._store(secure_receipt)import re
class SecretRedactor:
PATTERNS = [
(r'password["\']?\s*[:=]\s*["\']?[\w!@#$%^&*]+', '[REDACTED:PASSWORD]'),
(r'api[_-]?key["\']?\s*[:=]\s*["\']?[\w-]+', '[REDACTED:API_KEY]'),
(r'sk_live_[\w]+', '[REDACTED:STRIPE_KEY]'),
(r'-----BEGIN[\w\s]+PRIVATE KEY-----', '[REDACTED:PRIVATE_KEY]'),
(r'�\d{3}-\d{2}-\d{4}�', '[REDACTED:SSN]'),
(r'�[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}�', '[REDACTED:EMAIL]'),
(r'postgres://[^@]+:[^@]+@', 'postgres://[REDACTED]@'),
]
def redact(self, content: str) -> str:
for pattern, replacement in self.PATTERNS:
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
return contentfrom cryptography.fernet import Fernet
class EncryptedReceiptStore:
def __init__(self, encryption_key: bytes):
self.cipher = Fernet(encryption_key)
def store(self, receipt: Receipt) -> str:
# Encrypt sensitive fields before storage
encrypted_receipt = Receipt(
spec_hash=receipt.spec_hash, # Hashes don't need encryption
input_content=self._encrypt(receipt.input_content),
output_content=self._encrypt(receipt.output_content),
# ...
)
return self._store(encrypted_receipt)
def _encrypt(self, content: str) -> str:
return self.cipher.encrypt(content.encode()).decode()class TieredReceiptStore:
def __init__(self):
self.public_store = SQLiteStore("receipts_public.db") # Hashes only
self.private_store = EncryptedStore("receipts_private.db") # Content
self.sensitive_store = HSMStore("receipts_sensitive") # Secrets
def store(self, receipt: Receipt, sensitivity: str) -> str:
if sensitivity == "public":
# Only hashes, no content
return self.public_store.store(receipt.hashes_only())
elif sensitivity == "private":
# Encrypted content, accessible to team
return self.private_store.store(receipt)
elif sensitivity == "sensitive":
# HSM-protected, audit trail for access
return self.sensitive_store.store(receipt)class RetentionPolicy:
def __init__(self):
self.policies = {
"hashes": timedelta(days=365 * 7), # Keep hashes for 7 years
"content": timedelta(days=90), # Delete content after 90 days
"secrets": timedelta(days=1), # Delete secrets after 1 day
"pii": timedelta(days=30), # Delete PII after 30 days
}
async def enforce(self):
for category, retention in self.policies.items():
cutoff = datetime.utcnow() - retention
await self.store.delete_older_than(category, cutoff)class ReceiptAccessControl:
ROLES = {
"developer": ["read_hashes", "read_own_receipts"],
"team_lead": ["read_hashes", "read_team_receipts"],
"auditor": ["read_hashes", "read_metadata", "export_audit_log"],
"admin": ["read_all", "delete", "configure"],
}
def check_access(self, user: User, action: str, receipt: Receipt) -> bool:
allowed_actions = self.ROLES.get(user.role, [])
if action not in allowed_actions:
self.log_denied_access(user, action, receipt)
return False
if "own" in action and receipt.user_id != user.id:
return False
return True┌─────────────────────────────────────────────────────────────────┐
│ SECURE RECEIPT FLOW │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Task Input │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Redactor │ ← Remove secrets/PII before processing │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Hasher │────▶│ Hash Store │ ← Public: only hashes │
│ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ Encryptor │────▶│Private Store│ ← Encrypted content │
│ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Cleanup │ ← Retention policy enforcement │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Risk Level: 🟡 MEDIUM
BLACKICE already has Beads (event store with 40+ event types). BLACKICE 2.0 proposes adding Receipts (cryptographic audit trail).
Two stores = two truths = debugging nightmare.
Beads says: Task started at 10:00:00, failed at 10:05:00
Receipts say: Task started at 10:00:01, failed at 10:04:59
Developer: "Which one is right?"
Answer: "Yes"
Day 1: Beads and Receipts agree
Day 30: Minor timestamp differences
Day 90: Receipt missing for some tasks
Day 180: Beads has events Receipts doesn't know about
Day 365: Two completely different histories
# Query Beads
beads_result = beads.query("SELECT * FROM events WHERE task_id = 'abc'")
# Returns: 47 events, last status = "failed"
# Query Receipts
receipt_result = receipts.query("SELECT * FROM receipts WHERE task_id = 'abc'")
# Returns: 3 receipts, last status = "success"
# Which is true?System crashes. Recovery process:
RecoveryManager: "Checking Beads for incomplete tasks..."
Found: task-123 (in_progress)
ReceiptStore: "Checking receipts for task-123..."
Found: receipt shows "success"
RecoveryManager: "Is task-123 done or not?"
Auditor: "Show me the complete history of task-456"
You: "Here's the Beads events" (47 entries)
You: "Here's the Receipts" (3 entries)
Auditor: "Why don't they match?"
You: "Different granularity?"
Auditor: "This is not acceptable for compliance"
| Cause | Example |
|---|---|
| Different granularity | Beads: every event. Receipts: per-attempt summary |
| Different triggers | Beads: written by executor. Receipts: written by flywheel |
| Different failures | Beads write succeeds, Receipt write fails (or vice versa) |
| Different retention | Beads kept forever, Receipts pruned after 90 days |
| Different schemas | Beads schema evolves independently from Receipt schema |
class ReceiptStore:
"""Receipts are computed from Beads, not stored separately."""
def __init__(self, beads: BeadsClient):
self.beads = beads
def get_receipt(self, task_id: str) -> Receipt:
# Query Beads for all events for this task
events = self.beads.query_events(task_id)
# Compute receipt from events
return self._compute_receipt(events)
def _compute_receipt(self, events: list[Event]) -> Receipt:
return Receipt(
task_id=events[0].task_id,
spec_hash=self._find_spec_hash(events),
input_hash=self._compute_input_hash(events),
output_hash=self._compute_output_hash(events),
start_time=events[0].timestamp,
end_time=events[-1].timestamp,
status=events[-1].status,
# ...
)# When creating a receipt, store its ID in Beads
class IntegratedStore:
async def complete_task(self, task_id: str, result: TaskResult):
# Create receipt
receipt = self.receipt_store.create(task_id, result)
# Store receipt ID in Beads event
await self.beads.emit(Event(
type="task_completed",
task_id=task_id,
receipt_id=receipt.receipt_id, # Link to receipt
timestamp=datetime.utcnow()
))
return receiptclass UnifiedEventStore:
"""One write path, multiple read views."""
async def record(self, event: Event):
# Single write to Beads
await self.beads.emit(event)
# If this is a "receipt-worthy" event, trigger receipt computation
if event.type in ["task_completed", "task_failed"]:
await self._update_receipt_cache(event)
async def _update_receipt_cache(self, event: Event):
# Compute receipt from Beads (not separate write)
events = await self.beads.query_events(event.task_id)
receipt = self._compute_receipt(events)
# Cache for fast access (but Beads is source of truth)
await self.receipt_cache.set(event.task_id, receipt)class MerkleAnchoredReceipts:
"""Receipts are Merkle roots over Beads events."""
def create_receipt(self, task_id: str) -> Receipt:
events = self.beads.query_events(task_id)
# Compute Merkle root over all events
merkle_root = self._compute_merkle_root(events)
return Receipt(
task_id=task_id,
beads_merkle_root=merkle_root, # Proves Beads consistency
event_count=len(events),
# ...
)
def verify_receipt(self, receipt: Receipt) -> bool:
# Re-compute Merkle root from current Beads
events = self.beads.query_events(receipt.task_id)
current_root = self._compute_merkle_root(events)
# If roots match, Beads and Receipt are consistent
return current_root == receipt.beads_merkle_root┌─────────────────────────────────────────────────────────────────┐
│ UNIFIED TRUTH MODEL │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ │
│ │ Beads │ ← Single source of truth│
│ │ (Event Store) │ │
│ └────────┬────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Receipts │ │ Metrics │ │ Recovery │ │
│ │ (View) │ │ (View) │ │ (View) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └───────────────┴───────────────┘ │
│ │ │
│ All derived from Beads │
│ No separate writes │
│ No consistency issues │
│ │
└─────────────────────────────────────────────────────────────────┘
Risk Level: 🟡 MEDIUM
RDF, SHACL, SPARQL are powerful but obscure:
# How many Python developers know this?
from rdflib import Graph, Namespace, URIRef
from rdflib.namespace import RDF, RDFS, XSD
from pyshacl import validate
TASK = Namespace("http://blackice.dev/ontology/task#")
g = Graph()
g.bind("task", TASK)
g.add((TASK["my-task"], RDF.type, TASK.CodeGenTask))
g.add((TASK["my-task"], TASK.hasDescription, Literal("Build API")))
# ...100 more lines of graph manipulation...Answer: Almost none. This is a hiring/maintenance problem.
Team: "The SHACL shapes are broken"
Expert: "I'll fix it"
Expert: *leaves company*
Team: "...what's a SHACL shape?"
Error: "SHACL validation failed"
Developer: "Why?"
SHACL: "sh:resultPath task:hasDescription"
Developer: "What does that mean?"
SHACL: "sh:resultMessage 'Value does not match pattern'"
Developer: "What pattern? What value?"
SHACL: *unhelpful XML dump*
Developer: *gives up*
# Innocent-looking query
result = graph.query("""
SELECT ?task WHERE {
?task task:dependsOn+ ?dep .
?dep task:status "completed" .
}
""")
# With 10,000 tasks and complex dependencies:
# Runtime: 47 seconds
# Memory: 4GB
# Developer: "Why is this so slow?"# pyshacl version 0.20.0 works
# pyshacl version 0.21.0 changes API
# rdflib version 7.0 breaks compatibility
# Your CI/CD pipeline: 💥
New hire: "I'm a Python developer"
Codebase: "Great! Here's our RDF ontology, SHACL shapes, and SPARQL queries"
New hire: "I... don't know any of those"
Codebase: "Time to learn!"
New hire: *finds new job*
| Metric | JSON/Pydantic | RDF/SHACL/SPARQL |
|---|---|---|
| Developers who know it | 95% | <5% |
| Stack Overflow answers | Millions | Thousands |
| Debugging tools | Excellent | Limited |
| IDE support | Excellent | Poor |
| Library stability | Excellent | Variable |
| Hiring pool | Large | Tiny |
# BAD: Expose RDF everywhere
from rdflib import Graph, Namespace
graph = Graph()
graph.add((TASK["my-task"], RDF.type, TASK.CodeGenTask))
# GOOD: Clean Python interface, RDF hidden inside
class TaskSpec:
def __init__(self, task_id: str, task_type: str, description: str):
self.task_id = task_id
self.task_type = task_type
self.description = description
self._graph = self._build_graph() # Internal only
def validate(self) -> ValidationResult:
# Calls SHACL internally, returns clean Python objects
return self._validator.validate(self._graph)
# Developer never sees RDF
spec = TaskSpec("my-task", "codegen", "Build API")
result = spec.validate()
if not result.valid:
print(result.errors) # Clean Python, not SHACL XML# Phase 1: JSON Schema (everyone knows this)
from pydantic import BaseModel, Field
class TaskSpec(BaseModel):
task_id: str
task_type: Literal["codegen", "refactor", "test"]
description: str = Field(min_length=10)
priority: int = Field(ge=0, le=4)
dependencies: list[str] = []
# Phase 2: Add RDF export if needed
class TaskSpec(BaseModel):
# ... same fields ...
def to_rdf(self) -> Graph:
"""Export to RDF for advanced queries (optional)."""
# Only used when needed, not core pathclass HumanReadableValidator:
def validate(self, spec: TaskSpec) -> ValidationResult:
result = self._run_shacl(spec)
if not result.valid:
# Convert cryptic SHACL errors to human-readable
human_errors = []
for error in result.shacl_errors:
human_errors.append(self._humanize(error))
return ValidationResult(
valid=False,
errors=human_errors # ["Description must be at least 10 characters"]
)
return ValidationResult(valid=True)
def _humanize(self, shacl_error: SHACLError) -> str:
MESSAGES = {
"sh:minLength": "must be at least {value} characters",
"sh:minCount": "is required",
"sh:maxCount": "can only have one value",
"sh:in": "must be one of: {values}",
}
# Convert "sh:resultPath task:hasDescription, sh:minLength 10"
# To: "Description must be at least 10 characters"# Test the RDF layer extensively so developers don't have to understand it
class TestTaskValidation:
def test_valid_task_passes(self):
spec = TaskSpec("task-1", "codegen", "Build a REST API")
assert spec.validate().valid
def test_short_description_fails(self):
spec = TaskSpec("task-1", "codegen", "API")
result = spec.validate()
assert not result.valid
assert "at least 10 characters" in result.errors[0]
def test_invalid_priority_fails(self):
spec = TaskSpec("task-1", "codegen", "Build API", priority=99)
result = spec.validate()
assert not result.valid
assert "priority" in result.errors[0].lower()
# 50 more tests covering all edge cases
# So developers can refactor with confidenceUse RDF if you need:
- Complex graph queries (transitive dependencies, semantic reasoning)
- Multi-tenant/federated schemas
- Integration with semantic web ecosystem
- Long-term ontology evolution
Use JSON Schema if you need:
- Simple validation
- Fast iteration
- Large hiring pool
- Minimal operational overhead
Honest assessment for BLACKICE 2.0:
Do you NEED SPARQL graph queries?
├── Yes, for complex dependency analysis → Use RDF
└── No, just need validation → Use JSON Schema
Do you NEED semantic reasoning?
├── Yes, inferring task types from properties → Use RDF
└── No, explicit task types are fine → Use JSON Schema
Do you NEED federated schemas?
├── Yes, multi-tenant with custom schemas → Use RDF
└── No, single schema is fine → Use JSON Schema
┌─────────────────────────────────────────────────────────────────┐
│ PRAGMATIC APPROACH │
├─────────────────────────────────────────────────────────────────┤
│ │
│ START HERE │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Pydantic Models + JSON Schema Validation │ │
│ │ (Everyone knows this, fast to build, easy to maintain) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ │ If you hit limits (complex dependencies, reasoning) │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Add RDF Layer Behind Clean Interface │ │
│ │ (Hidden from developers, only used where needed) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ │ If RDF becomes core to product │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Invest in Tooling, Training, Hiring │ │
│ │ (Make it a team competency, not one person's magic) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
| Risk | Primary Mitigation | Fallback |
|---|---|---|
| NL → Spec brittleness | Confidence scoring + clarification | Permissive mode + iterative refinement |
| False compliance | Verification results in receipts | Separate "success" from "correct" |
| Over-constraining | Tiered strictness levels | Smart defaults + warn-not-block |
| Secrets in logs | Hash-only mode + redaction | Encryption + retention policies |
| Dual truth stores | Receipts derived from Beads | Merkle anchoring |
| Semantic-web complexity | Hide behind clean interfaces | Start with JSON Schema |
Risk analysis for BLACKICE 2.0 — January 7, 2026
Original gist: a36334c63186f70925e37e3e285ae66d
BLACKICE Architecture vs ggen Thesis: Complete Comparison (18 discovered components)
Date: January 7, 2026 Purpose: Compare BLACKICE codebase archaeology findings with the ggen PhD thesis on Specification-First Code Generation
| Dimension | ggen Thesis | BLACKICE |
|---|---|---|
| Lines of Code | ~8,748 | ~54,000 |
| Primary Language | TypeScript/Node.js | Python |
| Core Paradigm | Specification-First (RDF/SPARQL) | Runtime-Adaptive (LLM/Reflexion) |
| Determinism | Guaranteed (hash-based) | Learned (pattern-based) |
| Memory Model | Stateless (per-generation) | Stateful (Letta Archives) |
| Observability | OpenTelemetry | OpenTelemetry + Prometheus |
| Compliance | SOC2/HIPAA/GDPR | Full audit trails |
| # | Component | File | Lines | Purpose |
|---|---|---|---|---|
| 1 | Company Operations | company_operations.py |
~400 | GitHub/Deployment automation |
| 2 | Cancellation Token System | cancellation.py |
~300 | 7 reasons, 3 modes, token propagation |
| 3 | Resource Scheduler | resource_scheduler.py |
~350 | Memory/CPU/GPU constraints (3090) |
| 4 | Agent Mail Protocol | agents/mail.py |
~500 | 7 message types, 5 priorities, 3 delivery modes |
| 5 | Git Checkpoint Manager | git_checkpoint.py |
~400 | 5 triggers, 3 cleanup modes, rollback |
| 6 | Cloud Storage Backends | storage/factory.py |
~200 | S3, GCS, Azure, Local |
| 7 | Artifact Store | artifact_store.py |
~300 | Build output tracking with metadata |
| 8 | Semantic Memory | semantic_memory.py |
~600 | Embeddings, model tracking, Letta |
| 9 | Design Patterns | patterns.py |
~800 | Strategy, Chain, Builder, Factory, Decorator |
| 10 | Memory Store | memory.py |
~309 | Letta 0.16+ Archives API |
| 11 | Reflexion Loop | reflexion.py |
~700 | Self-improving execution (Shinn 2023) |
| 12 | Models + State Machine | models.py |
~800 | Full state machine, 40+ events |
| 13 | Validator Framework | validators.py |
~400 | Pluggable validation system |
| 14 | Orchestrator | orchestrator.py |
~600 | Multi-agent orchestration |
| 15 | OpenTelemetry Tracer | instrumentation/tracer.py |
~500 | Distributed tracing |
| 16 | Prometheus Metrics | instrumentation/metrics.py |
~400 | Counter, Histogram, Gauge |
| 17 | Retry Engine | retry.py |
~350 | Exponential backoff, jitter |
| 18 | Agent Registry | agents/registry.py |
~600 | Capability discovery, routing |
Total Discovered: ~7,600 lines additional infrastructure
A = μ(O)
Where:
A = Generated code artifacts
μ = Measurement function (ggen code generator)
O = Ontological specification (RDF/Turtle)
| # | Contribution | Implementation |
|---|---|---|
| 1 | SPARQL CONSTRUCT Pattern Library | 8 patterns, 70+ tests |
| 2 | Semantic CLI Framework | Citty integration |
| 3 | RDF-Driven Job Scheduler | 4,038 lines, Bree |
| 4 | OpenAPI DevOps Integration | 8 job definitions |
| 5 | Production Validation | 750+ test cases |
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Normalize │ → │ Extract │ → │ Emit │ → │Canonicalize│ → │ Receipt │
│ (RDF) │ │ (SPARQL) │ │ (Tera) │ │ (Format) │ │ (Hash) │
└───────────┘ └───────────┘ └───────────┘ └───────────┘ └───────────┘
┌──────────────────────────────────────────────────────────────────────────────┐
│ PARADIGM COMPARISON │
├───────────────────────────────────┬──────────────────────────────────────────┤
│ ggen (Deterministic) │ BLACKICE (Adaptive) │
├───────────────────────────────────┼──────────────────────────────────────────┤
│ │ │
│ RDF Specification │ Natural Language Task │
│ ↓ │ ↓ │
│ SHACL Validation │ SafetyGuard + CostTracker │
│ ↓ │ ↓ │
│ SPARQL CONSTRUCT │ LLMRouter (Model Selection) │
│ ↓ │ ↓ │
│ Tera Templates │ DAGExecutor + WorktreePool │
│ ↓ │ ↓ │
│ Deterministic Code │ Reflexion Loop (Self-Improve) │
│ ↓ │ ↓ │
│ blake3 Hash Receipt │ Beads Event Store │
│ │ ↓ │
│ │ LettaAdapter (Memory) │
│ │ │
└───────────────────────────────────┴─────────────────────────────────────────┘
| Guarantee | ggen | BLACKICE |
|---|---|---|
| Determinism | Mathematical (same spec → same code) | Statistical (learning improves over time) |
| Reproducibility | Hash-verified | Event-sourced |
| Auditability | Spec commit traces to code | Full Beads event log |
| Completeness | SHACL validation before generation | Validator framework at runtime |
| Recovery | Re-run from spec | RecoveryManager + DeadLetterQueue |
| Feature | ggen | BLACKICE |
|---|---|---|
| Tracing | OpenTelemetry (spans) | OpenTelemetry + custom tracer |
| Metrics | None documented | Prometheus (counters, histograms, gauges) |
| SLA Monitoring | p50/p95/p99 percentiles | CostTracker (tokens/time budgets) |
| Audit Logging | SOC2/HIPAA/GDPR | Full Beads event store |
| Feature | ggen | BLACKICE |
|---|---|---|
| Specification Store | RDF/Turtle files | Beads SQLite (40+ event types) |
| Cross-Session Memory | None | LettaAdapter (Archives API) |
| Pattern Learning | None | SemanticMemory + PatternLearner |
| Recovery | Re-run pipeline | RecoveryManager + crash resume |
| Feature | ggen | BLACKICE |
|---|---|---|
| Parallelism | Sequential pipeline | DAGExecutor (worker pool) |
| Isolation | None | WorktreePool (git worktree per task) |
| Cancellation | None | CancellationToken (7 reasons, 3 modes) |
| Retry | None | Exponential backoff + DeadLetterQueue |
| Feature | ggen | BLACKICE |
|---|---|---|
| Source | RDF/SPARQL | LLM (Claude/GPT/Ollama) |
| Templates | Tera | Design Patterns (5 types) |
| Validation | SHACL pre-generation | Validator framework post-execution |
| Learning | None | Reflexion (6 quality dimensions) |
Theorem (Determinism):
∀ O: μ(O) = μ(O) (idempotent)
Theorem (Auditability):
blake3(O) → A (specification hash determines code)
Theorem (Ontological Closure):
H(A | O) = 0 (no information in A not in O)
Theorem (Convergence):
lim_{n→∞} P(success | task, history_n) → 1
Theorem (Recovery):
∀ crash: ∃ checkpoint. resume(checkpoint) recovers state
Theorem (Cost Bounded):
tokens_used ≤ max_tokens_per_task
time_elapsed ≤ max_time_per_task
┌──────────────────────────────────────────────────────────────────────────┐
│ HYBRID ARCHITECTURE │
├──────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ ggen (Specification Layer) │ │
│ │ RDF Specs → SHACL Validation → SPARQL Transform → Tera Emit │ │
│ └─────────────────────────┬───────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ BLACKICE (Execution Layer) │ │
│ │ SafetyGuard → LLMRouter → DAGExecutor → Reflexion → Letta │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ Key: ggen provides deterministic scaffolding │
│ BLACKICE provides adaptive execution │
│ │
└──────────────────────────────────────────────────────────────────────────┘
- ggen generates BLACKICE config (RDF → YAML)
- BLACKICE learns from ggen patterns (SPARQL → Reflexion)
- Shared observability (both use OpenTelemetry)
- Unified compliance (ggen SOC2 + BLACKICE audit trails)
- Combined validation (SHACL + Validators)
The thesis defines 8 production-grade SPARQL CONSTRUCT patterns:
| # | Pattern | Use Case |
|---|---|---|
| 1 | OPTIONAL | Safe property enrichment with NULL handling |
| 2 | BIND | Computed values and type-safe derivation |
| 3 | FILTER | Conditional output with pattern matching |
| 4 | UNION | Polymorphic matching across types |
| 5 | GROUP_CONCAT | Aggregation without data loss |
| 6 | VALUES | Parameterization, injection-safe |
| 7 | EXISTS/NOT EXISTS | Graph logic and reasoning |
| 8 | Property Paths | Transitive navigation (depth-unknown) |
| Repository | Python | TypeScript | Total |
|---|---|---|---|
| ggen/thesis | 0 | ~8,748 | ~8,748 |
| BLACKICE/ralph | ~54,000 | 0 | ~54,000 |
| Repository | Test Cases | Phases Covered |
|---|---|---|
| ggen | 750+ | 7 (spec→deploy) |
| BLACKICE | Unknown | Runtime execution |
| Category | ggen | BLACKICE |
|---|---|---|
| Specification | ★★★★★ | ★★☆☆☆ |
| Validation | ★★★★★ | ★★★☆☆ |
| Code Generation | ★★★★☆ | ★★★★★ |
| Execution | ★★★☆☆ | ★★★★★ |
| Observability | ★★★★☆ | ★★★★★ |
| Memory/Learning | ★☆☆☆☆ | ★★★★★ |
| Recovery | ★★☆☆☆ | ★★★★★ |
- Add Letta integration for cross-session memory
- Implement Reflexion patterns for self-improving specs
- Add DAG execution for parallel spec processing
- Include cancellation tokens for long-running generations
- Add Prometheus metrics alongside OpenTelemetry
- Add RDF specification layer for enterprise schemas
- Implement SHACL validation for pre-execution checks
- Use SPARQL patterns for structured data queries
- Add deterministic hash receipts for audit trails
- Consider Tera templates for consistent code generation
class GitHubOperations:
async def create_repo(...)
async def create_pr(...)
async def merge_pr(...)
class DeploymentOperations:
async def deploy_to_staging(...)
async def deploy_to_production(...)
async def rollback(...)
class ProjectScaffolder:
async def scaffold_project(...)class CancellationReason(Enum):
TIMEOUT = "timeout"
USER_REQUEST = "user_request"
RESOURCE_EXHAUSTED = "resource_exhausted"
SAFETY_VIOLATION = "safety_violation"
DEPENDENCY_FAILED = "dependency_failed"
BUDGET_EXCEEDED = "budget_exceeded"
MANUAL_ABORT = "manual_abort"
class CancellationMode(Enum):
GRACEFUL = "graceful" # Finish current step
IMMEDIATE = "immediate" # Stop now, cleanup
FORCE = "force" # Stop now, no cleanup@dataclass
class ResourceConstraints:
memory_mb: int = 4096
cpu_cores: int = 4
gpu_memory_mb: int = 0 # For 3090 integration
max_concurrent: int = 10class MessageType(Enum):
TASK_REQUEST = "task_request"
TASK_RESULT = "task_result"
STATUS_UPDATE = "status_update"
ERROR_REPORT = "error_report"
HEARTBEAT = "heartbeat"
SHUTDOWN = "shutdown"
CAPABILITY_QUERY = "capability_query"
class MessagePriority(Enum):
CRITICAL = 0 # Immediate processing
HIGH = 1 # Next available slot
NORMAL = 2 # Standard queue
LOW = 3 # Background
DEFERRED = 4 # Process when idleclass CheckpointTrigger(Enum):
BEFORE_TOOL = "before_tool"
AFTER_SUCCESS = "after_success"
ON_ERROR = "on_error"
PERIODIC = "periodic"
MANUAL = "manual"
class CleanupMode(Enum):
KEEP_ALL = "keep_all"
KEEP_LATEST_N = "keep_latest_n"
CLEANUP_ON_SUCCESS = "cleanup_on_success"Each component follows similar enterprise patterns with:
- Full type hints
- Async/await support
- Error handling
- Logging integration
- Metrics emission
| Strength | ggen | BLACKICE |
|---|---|---|
| Best For | Repeatable infrastructure | Adaptive problem-solving |
| Trade-off | Less flexible | Less reproducible |
| Ideal Use | DevOps pipelines | AI agent execution |
| Maturity | PhD-ready | Production-ready |
The two systems are complementary: ggen excels at specification-driven deterministic generation, while BLACKICE excels at runtime adaptation and learning. A hybrid approach would leverage ggen for stable infrastructure and BLACKICE for dynamic task execution.
Generated by Claude Code archaeology on January 7, 2026
Original gist: 303c716fa9cc17c1733aedb1758362e5
BLACKICE 2.0: Enhanced with ggen Principles - Specification layer + Receipt store
Vision: BLACKICE as base + ggen's specification rigor = Enterprise-grade adaptive AI with deterministic guarantees
| ggen Feature | BLACKICE Gap | Enhancement Value |
|---|---|---|
| RDF Specifications | Tasks are unstructured | Formal task schemas |
| SHACL Validation | Runtime-only validation | Pre-execution guarantees |
| Deterministic Hashing | No artifact verification | Audit trail integrity |
| SPARQL Patterns | Ad-hoc data queries | Structured transformations |
| Five-Stage Pipeline | Monolithic execution | Clear phase boundaries |
| Tera Templates | LLM-generated code | Consistent scaffolding |
| Ontological Closure | Statistical convergence | Mathematical proofs |
┌─────────────────────────────────────────────────────────────────────────────────┐
│ BLACKICE 2.0 ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ NEW: Specification Layer (from ggen) │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │ │
│ │ │ RDF Schema │→ │SHACL Validate│→ │SPARQL Query │→ │Tera Template│ │ │
│ │ │ (Task Specs) │ │(Pre-Execute) │ │(Transform) │ │(Scaffold) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ EXISTING: Safety & Control Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ SafetyGuard │ │ CostTracker │ │ LLMRouter │ │ │
│ │ │ + Policies │ │ + Budgets │ │ + Selection │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ EXISTING: Execution Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ DAGExecutor │ │WorktreePool │ │ Reflexion │ │ │
│ │ │ + Parallel │ │ + Isolation │ │ + Learning │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ NEW: Verification Layer (from ggen) │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Canonicalize │→ │blake3 Hash │→ │Receipt Store │ │ │
│ │ │ (Normalize) │ │(Verify) │ │(Audit Trail) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────┬───────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ EXISTING: Memory & Recovery Layer │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────────┐ │ │
│ │ │ LettaAdapter │ │ BeadsStore │ │RecoveryMgr │ │DeadLetterQ │ │ │
│ │ │ + Archives │ │ + Events │ │ + Resume │ │ + Retry │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ └─────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
File: integrations/ralph/spec/task_ontology.ttl
@prefix task: <http://blackice.dev/ontology/task#> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
# Task Class Hierarchy
task:Task a rdfs:Class ;
rdfs:label "Base Task" ;
rdfs:comment "Root class for all BLACKICE tasks" .
task:CodeGenTask rdfs:subClassOf task:Task ;
rdfs:label "Code Generation Task" .
task:RefactorTask rdfs:subClassOf task:Task ;
rdfs:label "Refactoring Task" .
task:TestTask rdfs:subClassOf task:Task ;
rdfs:label "Testing Task" .
task:DeployTask rdfs:subClassOf task:Task ;
rdfs:label "Deployment Task" .
# Task Properties
task:hasDescription a rdf:Property ;
rdfs:domain task:Task ;
rdfs:range xsd:string .
task:hasPriority a rdf:Property ;
rdfs:domain task:Task ;
rdfs:range xsd:integer .
task:requiresModel a rdf:Property ;
rdfs:domain task:Task ;
rdfs:range task:ModelCapability .
task:maxTokenBudget a rdf:Property ;
rdfs:domain task:Task ;
rdfs:range xsd:integer .
task:maxTimeBudget a rdf:Property ;
rdfs:domain task:Task ;
rdfs:range xsd:duration .
task:dependsOn a rdf:Property ;
rdfs:domain task:Task ;
rdfs:range task:Task .File: integrations/ralph/spec/task_shapes.ttl
@prefix sh: <http://www.w3.org/ns/shacl#> .
@prefix task: <http://blackice.dev/ontology/task#> .
task:TaskShape a sh:NodeShape ;
sh:targetClass task:Task ;
sh:property [
sh:path task:hasDescription ;
sh:minCount 1 ;
sh:maxCount 1 ;
sh:datatype xsd:string ;
sh:minLength 10 ;
sh:message "Task must have a description of at least 10 characters"
] ;
sh:property [
sh:path task:hasPriority ;
sh:minCount 1 ;
sh:datatype xsd:integer ;
sh:minInclusive 0 ;
sh:maxInclusive 4 ;
sh:message "Priority must be 0-4 (P0=critical, P4=backlog)"
] ;
sh:property [
sh:path task:maxTokenBudget ;
sh:minCount 1 ;
sh:datatype xsd:integer ;
sh:minInclusive 1000 ;
sh:maxInclusive 1000000 ;
sh:message "Token budget must be 1K-1M"
] .
task:CodeGenTaskShape a sh:NodeShape ;
sh:targetClass task:CodeGenTask ;
sh:property [
sh:path task:targetLanguage ;
sh:minCount 1 ;
sh:in ("python" "typescript" "rust" "go" "elixir") ;
sh:message "Code generation requires target language"
] ;
sh:property [
sh:path task:outputPath ;
sh:minCount 1 ;
sh:pattern "^[a-zA-Z0-9_/.-]+$" ;
sh:message "Output path must be valid file path"
] .File: integrations/ralph/spec/validator.py
"""
SHACL-based specification validator for BLACKICE 2.0.
Validates task specifications before execution, ensuring:
1. All required fields present
2. Data types correct
3. Constraints satisfied
4. Dependencies valid
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from enum import Enum
import hashlib
# Use pyshacl for validation
try:
from pyshacl import validate as shacl_validate
SHACL_AVAILABLE = True
except ImportError:
SHACL_AVAILABLE = False
from rdflib import Graph, Namespace, URIRef
from rdflib.namespace import RDF, RDFS, XSD
TASK = Namespace("http://blackice.dev/ontology/task#")
class ValidationSeverity(Enum):
"""Validation result severity levels."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
FATAL = "fatal"
@dataclass
class ValidationResult:
"""Result of specification validation."""
valid: bool
severity: ValidationSeverity
message: str
path: Optional[str] = None
value: Optional[str] = None
@dataclass
class SpecificationReceipt:
"""Cryptographic receipt for validated specification."""
spec_hash: str # blake3 hash of spec
shapes_hash: str # blake3 hash of shapes used
timestamp: str
validation_passed: bool
results: list[ValidationResult]
class SpecificationValidator:
"""
Validates task specifications against SHACL shapes.
This brings ggen's pre-execution validation to BLACKICE,
ensuring tasks are well-formed before execution begins.
"""
def __init__(
self,
shapes_path: Optional[Path] = None,
ontology_path: Optional[Path] = None
):
self.shapes_graph = Graph()
self.ontology_graph = Graph()
# Load default shapes if not provided
if shapes_path:
self.shapes_graph.parse(shapes_path, format="turtle")
if ontology_path:
self.ontology_graph.parse(ontology_path, format="turtle")
def validate_spec(self, spec_graph: Graph) -> tuple[bool, list[ValidationResult]]:
"""
Validate a specification graph against SHACL shapes.
Returns:
Tuple of (is_valid, list of validation results)
"""
results = []
if not SHACL_AVAILABLE:
# Fallback to basic validation
return self._basic_validate(spec_graph)
# Run SHACL validation
conforms, results_graph, results_text = shacl_validate(
spec_graph,
shacl_graph=self.shapes_graph,
ont_graph=self.ontology_graph,
inference='rdfs',
abort_on_first=False
)
# Parse results
if not conforms:
for result in results_graph.subjects(RDF.type, URIRef("http://www.w3.org/ns/shacl#ValidationResult")):
severity = self._get_severity(results_graph, result)
message = str(results_graph.value(result, URIRef("http://www.w3.org/ns/shacl#resultMessage")))
path = str(results_graph.value(result, URIRef("http://www.w3.org/ns/shacl#resultPath")))
results.append(ValidationResult(
valid=False,
severity=severity,
message=message,
path=path
))
return conforms, results
def _basic_validate(self, spec_graph: Graph) -> tuple[bool, list[ValidationResult]]:
"""Basic validation without pyshacl."""
results = []
valid = True
# Check for required task properties
for task in spec_graph.subjects(RDF.type, TASK.Task):
# Check description
if not spec_graph.value(task, TASK.hasDescription):
results.append(ValidationResult(
valid=False,
severity=ValidationSeverity.ERROR,
message="Task missing required description",
path=str(task)
))
valid = False
# Check priority
priority = spec_graph.value(task, TASK.hasPriority)
if priority is None:
results.append(ValidationResult(
valid=False,
severity=ValidationSeverity.ERROR,
message="Task missing required priority",
path=str(task)
))
valid = False
elif int(priority) not in range(5):
results.append(ValidationResult(
valid=False,
severity=ValidationSeverity.ERROR,
message=f"Priority {priority} out of range 0-4",
path=str(task)
))
valid = False
return valid, results
def _get_severity(self, graph: Graph, result: URIRef) -> ValidationSeverity:
"""Extract severity from SHACL result."""
severity_uri = graph.value(result, URIRef("http://www.w3.org/ns/shacl#resultSeverity"))
if severity_uri:
severity_str = str(severity_uri).split("#")[-1].lower()
return ValidationSeverity(severity_str) if severity_str in ValidationSeverity.__members__ else ValidationSeverity.ERROR
return ValidationSeverity.ERROR
def create_receipt(
self,
spec_graph: Graph,
validation_results: list[ValidationResult]
) -> SpecificationReceipt:
"""
Create cryptographic receipt for specification.
This implements ggen's deterministic hashing for audit trails.
"""
from datetime import datetime
import blake3 # Or fallback to hashlib.sha256
# Serialize spec to canonical form
spec_bytes = spec_graph.serialize(format="nt").encode()
shapes_bytes = self.shapes_graph.serialize(format="nt").encode()
# Hash with blake3
try:
spec_hash = blake3.blake3(spec_bytes).hexdigest()
shapes_hash = blake3.blake3(shapes_bytes).hexdigest()
except:
# Fallback to SHA-256
spec_hash = hashlib.sha256(spec_bytes).hexdigest()
shapes_hash = hashlib.sha256(shapes_bytes).hexdigest()
return SpecificationReceipt(
spec_hash=spec_hash,
shapes_hash=shapes_hash,
timestamp=datetime.utcnow().isoformat(),
validation_passed=all(r.valid for r in validation_results),
results=validation_results
)
class TaskSpecBuilder:
"""
Builder for creating valid task specifications.
Implements ggen's Builder pattern for type-safe spec construction.
"""
def __init__(self):
self.graph = Graph()
self.graph.bind("task", TASK)
self._task_uri = None
self._task_type = TASK.Task
def task(self, task_id: str) -> "TaskSpecBuilder":
"""Start building a task specification."""
self._task_uri = TASK[task_id]
self.graph.add((self._task_uri, RDF.type, self._task_type))
return self
def of_type(self, task_type: str) -> "TaskSpecBuilder":
"""Set the task type."""
type_map = {
"codegen": TASK.CodeGenTask,
"refactor": TASK.RefactorTask,
"test": TASK.TestTask,
"deploy": TASK.DeployTask
}
self._task_type = type_map.get(task_type, TASK.Task)
self.graph.set((self._task_uri, RDF.type, self._task_type))
return self
def description(self, desc: str) -> "TaskSpecBuilder":
"""Set task description."""
from rdflib import Literal
self.graph.add((self._task_uri, TASK.hasDescription, Literal(desc)))
return self
def priority(self, p: int) -> "TaskSpecBuilder":
"""Set task priority (0-4)."""
from rdflib import Literal
self.graph.add((self._task_uri, TASK.hasPriority, Literal(p, datatype=XSD.integer)))
return self
def token_budget(self, tokens: int) -> "TaskSpecBuilder":
"""Set maximum token budget."""
from rdflib import Literal
self.graph.add((self._task_uri, TASK.maxTokenBudget, Literal(tokens, datatype=XSD.integer)))
return self
def depends_on(self, *task_ids: str) -> "TaskSpecBuilder":
"""Add task dependencies."""
for tid in task_ids:
self.graph.add((self._task_uri, TASK.dependsOn, TASK[tid]))
return self
def build(self) -> Graph:
"""Build and return the specification graph."""
return self.graphFile: integrations/ralph/spec/queries.py
"""
SPARQL query patterns for BLACKICE 2.0.
Implements ggen's 8 CONSTRUCT patterns adapted for task processing.
"""
from dataclasses import dataclass
from typing import Optional
from rdflib import Graph
from rdflib.plugins.sparql import prepareQuery
@dataclass
class QueryPattern:
"""A reusable SPARQL pattern."""
name: str
description: str
query: str
# Pattern 1: OPTIONAL - Enrich tasks with optional metadata
ENRICH_TASK_METADATA = QueryPattern(
name="enrich_task_metadata",
description="Add optional metadata to tasks",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task a task:EnrichedTask ;
task:hasDescription ?desc ;
task:hasPriority ?priority ;
task:hasEstimatedTokens ?tokens ;
task:hasEstimatedTime ?time ;
task:hasMetadata ?hasMetadata .
}
WHERE {
?task a task:Task ;
task:hasDescription ?desc ;
task:hasPriority ?priority .
OPTIONAL {
?task task:maxTokenBudget ?tokens .
}
OPTIONAL {
?task task:maxTimeBudget ?time .
}
BIND(BOUND(?tokens) || BOUND(?time) AS ?hasMetadata)
}
"""
)
# Pattern 2: BIND - Compute derived properties
COMPUTE_TASK_COMPLEXITY = QueryPattern(
name="compute_task_complexity",
description="Calculate task complexity score",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
CONSTRUCT {
?task task:complexityScore ?score ;
task:complexityCategory ?category .
}
WHERE {
?task a task:Task ;
task:maxTokenBudget ?tokens ;
task:hasPriority ?priority .
BIND((?tokens / 10000) + (4 - ?priority) AS ?rawScore)
BIND(xsd:integer(?rawScore) AS ?score)
BIND(
IF(?score > 10, "high",
IF(?score > 5, "medium", "low"))
AS ?category)
}
"""
)
# Pattern 3: FILTER - Select ready tasks
SELECT_READY_TASKS = QueryPattern(
name="select_ready_tasks",
description="Find tasks with no unfinished dependencies",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task a task:ReadyTask ;
task:hasDescription ?desc ;
task:hasPriority ?priority .
}
WHERE {
?task a task:Task ;
task:hasDescription ?desc ;
task:hasPriority ?priority ;
task:status "pending" .
FILTER NOT EXISTS {
?task task:dependsOn ?dep .
?dep task:status ?depStatus .
FILTER(?depStatus != "completed")
}
}
"""
)
# Pattern 4: UNION - Collect all task artifacts
COLLECT_TASK_ARTIFACTS = QueryPattern(
name="collect_task_artifacts",
description="Gather all artifacts from task execution",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task task:hasArtifact ?artifact .
}
WHERE {
?task a task:Task .
{
?task task:generatedCode ?artifact .
} UNION {
?task task:generatedTest ?artifact .
} UNION {
?task task:generatedDoc ?artifact .
}
}
"""
)
# Pattern 5: GROUP_CONCAT - Summarize task history
SUMMARIZE_TASK_HISTORY = QueryPattern(
name="summarize_task_history",
description="Aggregate task execution history",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task task:attemptSummary ?summary ;
task:attemptCount ?count .
}
WHERE {
{
SELECT ?task
(GROUP_CONCAT(?attemptResult; separator=", ") AS ?summary)
(COUNT(?attempt) AS ?count)
WHERE {
?task a task:Task .
?attempt task:attemptOf ?task ;
task:result ?attemptResult .
}
GROUP BY ?task
}
}
"""
)
# Pattern 6: VALUES - Parameterized task query
QUERY_TASKS_BY_TYPE = QueryPattern(
name="query_tasks_by_type",
description="Find tasks of specific types",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task a task:SelectedTask ;
task:hasDescription ?desc ;
task:taskType ?type .
}
WHERE {
VALUES ?type { task:CodeGenTask task:TestTask }
?task a ?type ;
task:hasDescription ?desc .
}
"""
)
# Pattern 7: EXISTS - Find blocked tasks
FIND_BLOCKED_TASKS = QueryPattern(
name="find_blocked_tasks",
description="Identify tasks blocked by dependencies",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task a task:BlockedTask ;
task:blockedBy ?blocker .
}
WHERE {
?task a task:Task ;
task:dependsOn ?blocker .
FILTER EXISTS {
?blocker task:status ?status .
FILTER(?status IN ("pending", "in_progress", "failed"))
}
}
"""
)
# Pattern 8: Property Paths - Find transitive dependencies
FIND_ALL_DEPENDENCIES = QueryPattern(
name="find_all_dependencies",
description="Find all transitive task dependencies",
query="""
PREFIX task: <http://blackice.dev/ontology/task#>
CONSTRUCT {
?task task:transitivelyDependsOn ?dep .
}
WHERE {
?task a task:Task .
?task task:dependsOn+ ?dep .
}
"""
)
class QueryExecutor:
"""Execute SPARQL patterns against task graphs."""
def __init__(self):
self.patterns = {
"enrich": ENRICH_TASK_METADATA,
"complexity": COMPUTE_TASK_COMPLEXITY,
"ready": SELECT_READY_TASKS,
"artifacts": COLLECT_TASK_ARTIFACTS,
"history": SUMMARIZE_TASK_HISTORY,
"by_type": QUERY_TASKS_BY_TYPE,
"blocked": FIND_BLOCKED_TASKS,
"dependencies": FIND_ALL_DEPENDENCIES
}
def execute(self, graph: Graph, pattern_name: str) -> Graph:
"""Execute a named pattern against a graph."""
pattern = self.patterns.get(pattern_name)
if not pattern:
raise ValueError(f"Unknown pattern: {pattern_name}")
result = graph.query(pattern.query)
return result.graph
def execute_pipeline(self, graph: Graph, *pattern_names: str) -> Graph:
"""Execute multiple patterns in sequence."""
result = graph
for name in pattern_names:
result = self.execute(result, name)
return resultFile: integrations/ralph/spec/receipt_store.py
"""
Receipt store for BLACKICE 2.0 audit trails.
Implements ggen's cryptographic receipt system for compliance.
"""
import json
import sqlite3
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import Optional, List
import hashlib
try:
import blake3
BLAKE3_AVAILABLE = True
except ImportError:
BLAKE3_AVAILABLE = False
@dataclass
class ExecutionReceipt:
"""Immutable receipt of task execution."""
receipt_id: str
task_id: str
spec_hash: str
input_hash: str
output_hash: str
model_used: str
tokens_used: int
time_elapsed_ms: int
status: str # success, failed, cancelled
timestamp: str
parent_receipt_id: Optional[str] = None # For retries
def to_json(self) -> str:
return json.dumps(asdict(self), indent=2)
@classmethod
def from_json(cls, data: str) -> "ExecutionReceipt":
return cls(**json.loads(data))
class ReceiptStore:
"""
Append-only store for execution receipts.
Provides SOC2/HIPAA/GDPR-compliant audit trails.
"""
def __init__(self, db_path: Path = Path("~/.blackice/receipts.db")):
self.db_path = db_path.expanduser()
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
"""Initialize SQLite database."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS receipts (
receipt_id TEXT PRIMARY KEY,
task_id TEXT NOT NULL,
spec_hash TEXT NOT NULL,
input_hash TEXT NOT NULL,
output_hash TEXT NOT NULL,
model_used TEXT NOT NULL,
tokens_used INTEGER NOT NULL,
time_elapsed_ms INTEGER NOT NULL,
status TEXT NOT NULL,
timestamp TEXT NOT NULL,
parent_receipt_id TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_task_id ON receipts(task_id)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_spec_hash ON receipts(spec_hash)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp ON receipts(timestamp)
""")
def store(self, receipt: ExecutionReceipt) -> str:
"""Store a receipt (append-only, never update)."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO receipts (
receipt_id, task_id, spec_hash, input_hash, output_hash,
model_used, tokens_used, time_elapsed_ms, status,
timestamp, parent_receipt_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
receipt.receipt_id, receipt.task_id, receipt.spec_hash,
receipt.input_hash, receipt.output_hash, receipt.model_used,
receipt.tokens_used, receipt.time_elapsed_ms, receipt.status,
receipt.timestamp, receipt.parent_receipt_id
))
return receipt.receipt_id
def get(self, receipt_id: str) -> Optional[ExecutionReceipt]:
"""Retrieve a receipt by ID."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
row = conn.execute(
"SELECT * FROM receipts WHERE receipt_id = ?",
(receipt_id,)
).fetchone()
if row:
return ExecutionReceipt(
receipt_id=row["receipt_id"],
task_id=row["task_id"],
spec_hash=row["spec_hash"],
input_hash=row["input_hash"],
output_hash=row["output_hash"],
model_used=row["model_used"],
tokens_used=row["tokens_used"],
time_elapsed_ms=row["time_elapsed_ms"],
status=row["status"],
timestamp=row["timestamp"],
parent_receipt_id=row["parent_receipt_id"]
)
return None
def get_by_task(self, task_id: str) -> List[ExecutionReceipt]:
"""Get all receipts for a task (execution history)."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM receipts WHERE task_id = ? ORDER BY timestamp",
(task_id,)
).fetchall()
return [ExecutionReceipt(**dict(row)) for row in rows]
def verify_chain(self, task_id: str) -> bool:
"""Verify receipt chain integrity for a task."""
receipts = self.get_by_task(task_id)
for i, receipt in enumerate(receipts[1:], 1):
if receipt.parent_receipt_id != receipts[i-1].receipt_id:
return False
return True
def export_audit_log(
self,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> str:
"""Export receipts as JSON for compliance auditing."""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
query = "SELECT * FROM receipts"
params = []
if start_date or end_date:
conditions = []
if start_date:
conditions.append("timestamp >= ?")
params.append(start_date)
if end_date:
conditions.append("timestamp <= ?")
params.append(end_date)
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY timestamp"
rows = conn.execute(query, params).fetchall()
receipts = [dict(row) for row in rows]
return json.dumps({
"export_timestamp": datetime.utcnow().isoformat(),
"receipt_count": len(receipts),
"receipts": receipts
}, indent=2)
def create_receipt(
task_id: str,
spec_hash: str,
input_data: bytes,
output_data: bytes,
model_used: str,
tokens_used: int,
time_elapsed_ms: int,
status: str,
parent_receipt_id: Optional[str] = None
) -> ExecutionReceipt:
"""Factory function to create a receipt with proper hashing."""
def hash_bytes(data: bytes) -> str:
if BLAKE3_AVAILABLE:
return blake3.blake3(data).hexdigest()
return hashlib.sha256(data).hexdigest()
# Generate receipt ID from all fields
receipt_content = f"{task_id}:{spec_hash}:{hash_bytes(input_data)}:{hash_bytes(output_data)}:{model_used}:{tokens_used}:{time_elapsed_ms}:{status}"
receipt_id = hash_bytes(receipt_content.encode())[:16]
return ExecutionReceipt(
receipt_id=receipt_id,
task_id=task_id,
spec_hash=spec_hash,
input_hash=hash_bytes(input_data),
output_hash=hash_bytes(output_data),
model_used=model_used,
tokens_used=tokens_used,
time_elapsed_ms=time_elapsed_ms,
status=status,
timestamp=datetime.utcnow().isoformat(),
parent_receipt_id=parent_receipt_id
)# In enterprise_flywheel.py - add specification layer
from integrations.ralph.spec.validator import SpecificationValidator, TaskSpecBuilder
from integrations.ralph.spec.queries import QueryExecutor
from integrations.ralph.spec.receipt_store import ReceiptStore, create_receipt
class EnterpriseFlywheel:
"""Enhanced flywheel with ggen specification layer."""
def __init__(self, config: EnterpriseFlywheelConfig):
# Existing components
self.beads = BeadsClient(config.beads_db_path)
self.safety_guard = SafetyGuard(config.allowed_policies)
self.cost_tracker = CostTracker(...)
self.llm_router = LLMRouter(config)
self.dag_executor = DAGExecutor(...)
self.worktree_pool = WorktreePool(...)
self.reflexion = ReflexionLoop(...)
self.letta_adapter = LettaAdapter()
self.recovery_manager = RecoveryManager(...)
self.dead_letter_queue = DeadLetterQueue(...)
# NEW: ggen-inspired components
self.spec_validator = SpecificationValidator(
shapes_path=config.shapes_path,
ontology_path=config.ontology_path
)
self.query_executor = QueryExecutor()
self.receipt_store = ReceiptStore(config.receipt_db_path)
async def run(self, task: Task) -> FlywheelResult:
"""Execute with specification validation and receipts."""
# Phase 1: Specification (NEW)
spec_graph = self._task_to_spec(task)
valid, results = self.spec_validator.validate_spec(spec_graph)
if not valid:
return FlywheelResult(
status="rejected",
reason="Specification validation failed",
validation_results=results
)
spec_receipt = self.spec_validator.create_receipt(spec_graph, results)
# Phase 2: Query transformation (NEW)
enriched = self.query_executor.execute(spec_graph, "enrich")
ready_check = self.query_executor.execute(enriched, "ready")
# Phase 3: Existing safety checks
decision = self.safety_guard.evaluate(SafetyCheckpoint.START_OF_RUN, task)
if decision.action == SafetyAction.ABORT:
return FlywheelResult(status="aborted", reason=decision.reason)
# Phase 4: Existing execution with Reflexion
worktree = await self.worktree_pool.acquire(task.id)
try:
result = await self._execute_with_reflexion(task, worktree)
finally:
await self.worktree_pool.release(worktree)
# Phase 5: Create execution receipt (NEW)
execution_receipt = create_receipt(
task_id=task.id,
spec_hash=spec_receipt.spec_hash,
input_data=task.serialize(),
output_data=result.serialize(),
model_used=result.model_used,
tokens_used=result.tokens_used,
time_elapsed_ms=result.time_elapsed_ms,
status=result.status
)
self.receipt_store.store(execution_receipt)
return FlywheelResult(
status=result.status,
output=result.output,
spec_receipt=spec_receipt,
execution_receipt=execution_receipt
)| Enhancement | Source | Benefit |
|---|---|---|
| Task Ontology | ggen RDF | Formal task schema |
| SHACL Validation | ggen | Pre-execution guarantees |
| 8 SPARQL Patterns | ggen | Structured queries |
| blake3 Receipts | ggen | Audit trail integrity |
| Receipt Store | ggen | SOC2/HIPAA/GDPR compliance |
| Specification Builder | ggen | Type-safe task creation |
- P0: SpecificationValidator + basic shapes
- P0: ReceiptStore for audit trails
- P1: TaskSpecBuilder for type safety
- P1: SPARQL patterns for ready task selection
- P2: Full RDF ontology
- P2: Complete SHACL shapes
BLACKICE 2.0 = BLACKICE adaptive execution + ggen specification rigor
Original gist: b288702807548dae591a1669354c995d
BLACKICE Code Archaeology: What ChatGPT Missed - Complete analysis of 18 production-ready components
Generated: 2026-01-07 Purpose: Complete analysis of BLACKICE components discovered through code archaeology that were missing or incomplete in ChatGPT's BLACKICE-SPEC-2.0
ChatGPT's BLACKICE spec captured the high-level 12-layer architecture well but missed 18 major production-ready components already implemented in the codebase. This document catalogs every discovered capability with code locations, key interfaces, and implementation status.
What ChatGPT Missed: Full GitHub automation, Vercel/Cloudflare deployment, project scaffolding
class GitHubOperations:
"""Complete GitHub automation beyond basic git."""
async def create_repository(self, name: str, description: str, private: bool = True) -> dict
async def create_pull_request(self, repo: str, title: str, head: str, base: str, body: str) -> dict
async def merge_pull_request(self, repo: str, pr_number: int, merge_method: str = "squash") -> dict
async def create_release(self, repo: str, tag: str, name: str, body: str) -> dict
async def setup_branch_protection(self, repo: str, branch: str, rules: dict) -> dict
class DeploymentOperations:
"""Vercel + Cloudflare deployment automation."""
async def deploy_to_vercel(self, project_dir: Path, env_vars: dict) -> dict
async def setup_cloudflare_dns(self, domain: str, records: list[dict]) -> dict
async def configure_cloudflare_workers(self, worker_script: str, routes: list[str]) -> dict
class ProjectScaffolder:
"""Template-based project generation."""
templates: dict[str, ProjectTemplate] # python-cli, python-api, react-app, nextjs-appStatus: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Cooperative cancellation with parent/child propagation, multiple cancellation modes
class CancellationReason(Enum):
TIMEOUT = "timeout"
USER_REQUEST = "user_request"
BUDGET_EXCEEDED = "budget_exceeded"
SAFETY_VIOLATION = "safety_violation"
RUN_CANCELLED = "run_cancelled"
PARENT_CANCELLED = "parent_cancelled"
ERROR = "error"
class CancellationMode(Enum):
ABORT = "abort" # Immediate termination
PAUSE = "pause" # Pause for later resume
GRACEFUL = "graceful" # Complete current operation, then stop
@dataclass
class CancellationToken:
"""Cooperative cancellation with parent/child propagation."""
id: str
mode: CancellationMode
reason: Optional[CancellationReason] = None
message: Optional[str] = None
parent: Optional['CancellationToken'] = None
children: list['CancellationToken'] = field(default_factory=list)
_cancelled: bool = False
_callbacks: list[Callable] = field(default_factory=list)
def cancel(self, reason: CancellationReason, message: str = "", mode: CancellationMode = None):
"""Cancel this token and all children."""
self._cancelled = True
self.reason = reason
self.message = message
if mode:
self.mode = mode
# Propagate to children
for child in self.children:
child.cancel(CancellationReason.PARENT_CANCELLED, f"Parent cancelled: {message}")
# Fire callbacks
for callback in self._callbacks:
callback(self)
def create_child(self) -> 'CancellationToken':
"""Create a linked child token."""
child = CancellationToken(id=f"{self.id}-{len(self.children)}", mode=self.mode, parent=self)
self.children.append(child)
return child
class CancellationScope:
"""Context manager for scoped cancellation."""
async def __aenter__(self) -> CancellationToken
async def __aexit__(self, exc_type, exc_val, exc_tb)Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Memory/CPU/GPU constraint enforcement, reservation system
@dataclass
class ResourceConstraint:
min_memory_mb: int = 0
max_memory_mb: int = 0
min_cpu_cores: float = 0
max_cpu_cores: float = 0
gpu_required: bool = False
gpu_memory_mb: int = 0
@dataclass
class ResourceReservation:
id: str
constraints: ResourceConstraint
task_id: str
acquired_at: datetime
expires_at: Optional[datetime] = None
class ResourceScheduler:
"""Enforces resource constraints before task execution."""
def __init__(self, config: ResourceConfig):
self.max_memory_mb = config.max_memory_mb
self.max_cpu_cores = config.max_cpu_cores
self.gpu_memory_mb = config.gpu_memory_mb
self.reservations: dict[str, ResourceReservation] = {}
async def can_schedule(self, constraints: ResourceConstraint) -> bool:
"""Check if resources are available."""
available = self._get_available_resources()
return (
available.memory_mb >= constraints.min_memory_mb and
available.cpu_cores >= constraints.min_cpu_cores and
(not constraints.gpu_required or available.gpu_memory_mb >= constraints.gpu_memory_mb)
)
async def reserve(self, task_id: str, constraints: ResourceConstraint) -> ResourceReservation:
"""Reserve resources for a task."""
async def release(self, reservation_id: str):
"""Release a reservation."""
async def wait_for_resources(self, constraints: ResourceConstraint, timeout: float = 60) -> bool:
"""Wait until resources become available."""Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Full inter-agent messaging with delivery guarantees
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
BROADCAST = "broadcast"
ACK = "ack"
NACK = "nack"
HEARTBEAT = "heartbeat"
class MessagePriority(Enum):
LOW = 0
NORMAL = 1
HIGH = 2
URGENT = 3
CRITICAL = 4
class DeliveryMode(Enum):
AT_MOST_ONCE = "at_most_once" # Fire and forget
AT_LEAST_ONCE = "at_least_once" # Retry until ACK
EXACTLY_ONCE = "exactly_once" # Dedup + retry
@dataclass
class AgentMessage:
id: str
type: MessageType
sender: str
recipient: str
payload: dict
priority: MessagePriority = MessagePriority.NORMAL
delivery_mode: DeliveryMode = DeliveryMode.AT_LEAST_ONCE
correlation_id: Optional[str] = None # For request/response pairing
reply_to: Optional[str] = None
ttl_seconds: int = 300
created_at: datetime = field(default_factory=datetime.utcnow)
retries: int = 0
max_retries: int = 3
class MessageBus:
"""Central message routing with delivery guarantees."""
async def send(self, message: AgentMessage) -> str:
"""Send a message with delivery tracking."""
async def broadcast(self, sender: str, payload: dict, priority: MessagePriority = MessagePriority.NORMAL):
"""Broadcast to all agents."""
async def request(self, sender: str, recipient: str, payload: dict, timeout: float = 30) -> AgentMessage:
"""Send request and wait for response."""
async def subscribe(self, agent_id: str, handler: Callable[[AgentMessage], Awaitable[None]]):
"""Subscribe to messages for an agent."""
class Mailbox:
"""Per-agent message queue with priority ordering."""
messages: PriorityQueue[AgentMessage]
pending_acks: dict[str, AgentMessage]
seen_ids: set[str] # For exactly-once dedupStatus: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Granular checkpointing beyond worktrees
class CheckpointTrigger(Enum):
MANUAL = "manual"
ITERATION = "iteration"
TOOL_CALL = "tool_call"
SUCCESS = "success"
FAILURE = "failure"
PERIODIC = "periodic"
class CleanupMode(Enum):
KEEP_ALL = "keep_all"
KEEP_LATEST_N = "keep_latest_n"
KEEP_SUCCESSFUL = "keep_successful"
KEEP_NONE = "keep_none"
@dataclass
class GitCheckpoint:
id: str
run_id: str
iteration: int
trigger: CheckpointTrigger
commit_sha: str
branch_name: str
message: str
created_at: datetime
files_changed: list[str]
metadata: dict
class GitCheckpointManager:
"""Manages git checkpoints for rollback and recovery."""
async def create_checkpoint(
self,
run_id: str,
iteration: int,
trigger: CheckpointTrigger,
message: str = ""
) -> GitCheckpoint:
"""Create a checkpoint at current state."""
async def restore_checkpoint(self, checkpoint_id: str) -> bool:
"""Restore working directory to checkpoint state."""
async def list_checkpoints(self, run_id: str) -> list[GitCheckpoint]:
"""List all checkpoints for a run."""
async def cleanup(self, run_id: str, mode: CleanupMode, keep_n: int = 5):
"""Clean up old checkpoints."""
async def diff_checkpoints(self, from_id: str, to_id: str) -> str:
"""Get diff between two checkpoints."""Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: S3/GCS/Azure blob storage abstraction
class StorageBackend(Protocol):
"""Abstract storage interface."""
async def upload(self, key: str, data: bytes, content_type: str = None) -> str
async def download(self, key: str) -> bytes
async def delete(self, key: str) -> bool
async def exists(self, key: str) -> bool
async def list_keys(self, prefix: str = "") -> list[str]
async def get_signed_url(self, key: str, expires_in: int = 3600) -> str
class S3Backend(StorageBackend):
"""AWS S3 implementation."""
def __init__(self, bucket: str, region: str, credentials: AWSCredentials)
class GCSBackend(StorageBackend):
"""Google Cloud Storage implementation."""
def __init__(self, bucket: str, project: str, credentials: GCPCredentials)
class AzureBlobBackend(StorageBackend):
"""Azure Blob Storage implementation."""
def __init__(self, container: str, connection_string: str)
class LocalBackend(StorageBackend):
"""Local filesystem for development."""
def __init__(self, base_path: Path)
class StorageFactory:
@staticmethod
def create(config: StorageConfig) -> StorageBackend:
"""Factory method to create appropriate backend."""
if config.provider == "s3":
return S3Backend(config.bucket, config.region, config.credentials)
elif config.provider == "gcs":
return GCSBackend(config.bucket, config.project, config.credentials)
elif config.provider == "azure":
return AzureBlobBackend(config.container, config.connection_string)
else:
return LocalBackend(config.base_path)Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Build output tracking with cloud storage integration
class ArtifactType(Enum):
CODE = "code"
TEST_RESULTS = "test_results"
COVERAGE = "coverage"
LOGS = "logs"
METRICS = "metrics"
MODEL_OUTPUT = "model_output"
CHECKPOINT = "checkpoint"
SCREENSHOT = "screenshot"
@dataclass
class Artifact:
id: str
run_id: str
task_id: str
type: ArtifactType
name: str
storage_key: str
size_bytes: int
content_type: str
checksum: str
created_at: datetime
metadata: dict
tags: list[str]
class ArtifactStore:
"""Manages build artifacts with cloud storage."""
def __init__(self, storage: StorageBackend, beads: BeadsClient):
self.storage = storage
self.beads = beads
async def store(
self,
run_id: str,
task_id: str,
artifact_type: ArtifactType,
name: str,
data: bytes,
content_type: str = "application/octet-stream",
metadata: dict = None,
tags: list[str] = None
) -> Artifact:
"""Store an artifact and record in Beads."""
async def retrieve(self, artifact_id: str) -> tuple[Artifact, bytes]:
"""Retrieve artifact metadata and content."""
async def list_artifacts(
self,
run_id: str = None,
task_id: str = None,
artifact_type: ArtifactType = None,
tags: list[str] = None
) -> list[Artifact]:
"""Query artifacts with filters."""
async def get_download_url(self, artifact_id: str, expires_in: int = 3600) -> str:
"""Get signed download URL."""Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Embedding-based learning with model performance tracking
class EmbeddingProvider(Protocol):
async def embed(self, text: str) -> list[float]
async def embed_batch(self, texts: list[str]) -> list[list[float]]
class OllamaEmbeddings(EmbeddingProvider):
"""Ollama embedding provider using nomic-embed-text."""
def __init__(self, base_url: str = "http://localhost:11434", model: str = "nomic-embed-text"):
self.base_url = base_url
self.model = model
async def embed(self, text: str) -> list[float]:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/embeddings",
json={"model": self.model, "prompt": text}
)
return response.json()["embedding"]
@dataclass
class MemoryEntry:
id: str
content: str
embedding: list[float]
category: str # "success", "failure", "insight", "pattern"
task_type: str
model_used: str
timestamp: datetime
metadata: dict
decay_factor: float = 1.0 # For relevance decay over time
class SemanticMemory:
"""Embedding-based memory with similarity search."""
def __init__(self, embedder: EmbeddingProvider, db_path: Path):
self.embedder = embedder
self.entries: list[MemoryEntry] = []
self.model_stats: dict[str, ModelStats] = {}
async def store(self, content: str, category: str, task_type: str, model_used: str, metadata: dict = None):
"""Store content with embedding."""
embedding = await self.embedder.embed(content)
entry = MemoryEntry(
id=str(uuid4()),
content=content,
embedding=embedding,
category=category,
task_type=task_type,
model_used=model_used,
timestamp=datetime.utcnow(),
metadata=metadata or {}
)
self.entries.append(entry)
self._update_model_stats(model_used, category)
async def query_similar(self, query: str, limit: int = 5, category: str = None) -> list[MemoryEntry]:
"""Find similar entries using cosine similarity."""
query_embedding = await self.embedder.embed(query)
scored = []
for entry in self.entries:
if category and entry.category != category:
continue
similarity = self._cosine_similarity(query_embedding, entry.embedding)
# Apply decay factor
age_days = (datetime.utcnow() - entry.timestamp).days
decayed_score = similarity * (entry.decay_factor ** (age_days / 30))
scored.append((entry, decayed_score))
scored.sort(key=lambda x: x[1], reverse=True)
return [e for e, _ in scored[:limit]]
def get_model_performance(self, model: str) -> ModelStats:
"""Get success/failure stats for a model."""
return self.model_stats.get(model, ModelStats())
@staticmethod
def _cosine_similarity(a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
return dot / (norm_a * norm_b) if norm_a and norm_b else 0Status: Production-ready, ChatGPT had partial coverage (mentioned memory but missed embeddings)
What ChatGPT Missed: Formal design pattern implementations
# Strategy Pattern
class CodeExtractor(Protocol):
"""Strategy for extracting code from LLM responses."""
def extract(self, response: str) -> list[CodeBlock]
class MarkdownExtractor(CodeExtractor):
"""Extract code from markdown fenced blocks."""
class XMLExtractor(CodeExtractor):
"""Extract code from XML tags."""
class MixedExtractor(CodeExtractor):
"""Try multiple extractors."""
# Chain of Responsibility
class ChainableValidator(ABC):
"""Base class for validation chain."""
_next: Optional['ChainableValidator'] = None
def set_next(self, handler: 'ChainableValidator') -> 'ChainableValidator':
self._next = handler
return handler
@abstractmethod
def validate(self, context: ValidationContext) -> ValidationResult
def _pass_to_next(self, context: ValidationContext) -> ValidationResult:
if self._next:
return self._next.validate(context)
return ValidationResult(passed=True)
class SyntaxValidator(ChainableValidator):
"""Validate syntax."""
class SecurityValidator(ChainableValidator):
"""Check for security issues."""
class TestValidator(ChainableValidator):
"""Run tests."""
# Builder Pattern
class PromptBuilder:
"""Fluent builder for complex prompts."""
def __init__(self):
self._system = ""
self._context = []
self._examples = []
self._instructions = []
self._constraints = []
def with_system(self, system: str) -> 'PromptBuilder':
self._system = system
return self
def with_context(self, context: str) -> 'PromptBuilder':
self._context.append(context)
return self
def with_example(self, input: str, output: str) -> 'PromptBuilder':
self._examples.append({"input": input, "output": output})
return self
def with_instruction(self, instruction: str) -> 'PromptBuilder':
self._instructions.append(instruction)
return self
def with_constraint(self, constraint: str) -> 'PromptBuilder':
self._constraints.append(constraint)
return self
def build(self) -> str:
"""Build the final prompt."""
# Factory Pattern
class ProjectConfigFactory:
"""Factory for project configurations."""
_configs: dict[str, type[ProjectConfig]] = {}
@classmethod
def register(cls, project_type: str, config_class: type[ProjectConfig]):
cls._configs[project_type] = config_class
@classmethod
def create(cls, project_type: str, **kwargs) -> ProjectConfig:
if project_type not in cls._configs:
raise ValueError(f"Unknown project type: {project_type}")
return cls._configs[project_type](**kwargs)
# Decorator Pattern
class ValidatorDecorator(ABC):
"""Base decorator for validators."""
def __init__(self, validator: Validator):
self._validator = validator
@abstractmethod
def validate(self, context: ValidationContext) -> ValidationResult
class RetryValidator(ValidatorDecorator):
"""Decorator that adds retry logic."""
def __init__(self, validator: Validator, max_retries: int = 3):
super().__init__(validator)
self.max_retries = max_retries
def validate(self, context: ValidationContext) -> ValidationResult:
for attempt in range(self.max_retries):
result = self._validator.validate(context)
if result.passed:
return result
return result
class CachingValidator(ValidatorDecorator):
"""Decorator that caches validation results."""Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Full Letta 0.16+ Archives API integration
class MemoryStore:
"""
Stores and retrieves attempt records using Letta archival memory.
Updated for Letta 0.16+ Archives API.
"""
def __init__(self, config: LoopConfig):
self.config = config
self.base_url = config.letta_url
self.agent_id = config.memory_agent_id
self.headers = {
"Authorization": f"Bearer {config.letta_token}",
"Content-Type": "application/json"
}
self._archive_id: Optional[str] = None
# Local cache fallback
self.cache_dir = Path.home() / ".ralph" / "memory"
self.cache_file = self.cache_dir / "attempts.jsonl"
async def _get_or_create_archive(self, client: httpx.AsyncClient) -> Optional[str]:
"""Get or create archive for Ralph Loop memory (Letta 0.16+ API)."""
archive_name = f"ralph-loop-{self.agent_id[:8]}"
# Check if archive exists
response = await client.get(
f"{self.base_url}/v1/archives/",
headers=self.headers,
params={"name": archive_name}
)
if response.status_code == 200:
archives = response.json()
for archive in archives:
if archive.get("name") == archive_name:
self._archive_id = archive.get("id")
return self._archive_id
# Create new archive with Ollama embeddings
response = await client.post(
f"{self.base_url}/v1/archives/",
headers=self.headers,
json={
"name": archive_name,
"description": "Ralph Loop attempt history for learning",
"embedding": "ollama/nomic-embed-text:latest"
}
)
if response.status_code in (200, 201):
self._archive_id = response.json().get("id")
return self._archive_id
return None
async def store_attempt(self, attempt: AttemptRecord) -> bool:
"""Store attempt in Letta Archives API with local fallback."""
async def query_similar(self, task: str, limit: int = 5) -> list[dict]:
"""Semantic search via Letta or local keyword fallback."""
async def build_context(self, task: str) -> str:
"""Build context string from memory for prompt injection."""Status: Production-ready, ChatGPT had partial coverage (mentioned Letta but missed API details)
What ChatGPT Missed: Full self-improvement cycle with quality dimensions
class QualityDimension(Enum):
CORRECTNESS = "correctness"
COMPLETENESS = "completeness"
CODE_QUALITY = "code_quality"
EFFICIENCY = "efficiency"
SAFETY = "safety"
TESTABILITY = "testability"
@dataclass
class QualityScore:
dimension: QualityDimension
score: float # 0.0 to 1.0
confidence: float
evidence: list[str]
suggestions: list[str]
@dataclass
class Evaluation:
overall_score: float
dimension_scores: dict[QualityDimension, QualityScore]
passed: bool
grade: str # "A", "B", "C", "D", "F"
summary: str
@dataclass
class Reflection:
what_worked: list[str]
what_failed: list[str]
root_causes: list[str]
improvements: list[str]
confidence: float
@dataclass
class Learning:
insight: str
category: str # "success_pattern", "failure_pattern", "optimization"
task_type: str
model_used: str
timestamp: datetime
class ReflexionLoop:
"""
Self-improving execution loop implementing the Reflexion paper.
Flow:
1. RETRIEVE: Query memory for relevant past experiences
2. EXECUTE: Run the task with context from memory
3. EVALUATE: Score output quality across dimensions
4. REFLECT: Analyze what worked and what failed
5. LEARN: Store insights in memory
6. REFINE: Improve prompts/strategies for next iteration
"""
def __init__(self, memory: SemanticMemory, evaluator: QualityEvaluator):
self.memory = memory
self.evaluator = evaluator
self.max_iterations = 5
self.success_threshold = 0.8
async def run(self, task: str, executor: Callable) -> ReflexionResult:
"""Run the full reflexion loop."""
context = await self._retrieve(task)
for iteration in range(self.max_iterations):
# Execute with current context
output = await executor(task, context)
# Evaluate quality
evaluation = await self._evaluate(task, output)
if evaluation.passed:
# Learn from success
await self._learn_success(task, output, evaluation)
return ReflexionResult(success=True, output=output, iterations=iteration + 1)
# Reflect on failure
reflection = await self._reflect(task, output, evaluation)
# Learn from failure
await self._learn_failure(task, output, reflection)
# Refine context for next iteration
context = await self._refine(context, reflection)
return ReflexionResult(success=False, output=output, iterations=self.max_iterations)
async def _evaluate(self, task: str, output: str) -> Evaluation:
"""Evaluate output quality across all dimensions."""
dimension_scores = {}
for dimension in QualityDimension:
score = await self.evaluator.score(task, output, dimension)
dimension_scores[dimension] = score
overall = sum(s.score for s in dimension_scores.values()) / len(dimension_scores)
passed = overall >= self.success_threshold
grade = self._score_to_grade(overall)
return Evaluation(
overall_score=overall,
dimension_scores=dimension_scores,
passed=passed,
grade=grade,
summary=self._generate_summary(dimension_scores)
)
async def _reflect(self, task: str, output: str, evaluation: Evaluation) -> Reflection:
"""Generate reflection on what worked and what failed."""
# Use LLM to analyze the execution
prompt = self._build_reflection_prompt(task, output, evaluation)
reflection_text = await self._get_llm_reflection(prompt)
return self._parse_reflection(reflection_text)
@staticmethod
def _score_to_grade(score: float) -> str:
if score >= 0.9:
return "A"
elif score >= 0.8:
return "B"
elif score >= 0.7:
return "C"
elif score >= 0.6:
return "D"
else:
return "F"Status: Production-ready, ChatGPT had partial coverage (mentioned QualityScore but missed full flow)
What ChatGPT Missed: Complete state machine with all transitions
class RunState(Enum):
INIT = "init"
PLANNING = "planning"
WAITING_FOR_APPROVAL = "waiting_for_approval"
RUNNING = "running"
PAUSED = "paused"
ITERATING = "iterating"
EVALUATING = "evaluating"
REFLECTING = "reflecting"
RECOVERING = "recovering"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
TIMED_OUT = "timed_out"
class TaskState(Enum):
PENDING = "pending"
QUEUED = "queued"
SCHEDULED = "scheduled"
RUNNING = "running"
BLOCKED = "blocked"
WAITING_FOR_INPUT = "waiting_for_input"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
CANCELLED = "cancelled"
VALID_RUN_TRANSITIONS = {
RunState.INIT: [RunState.PLANNING, RunState.RUNNING, RunState.CANCELLED],
RunState.PLANNING: [RunState.WAITING_FOR_APPROVAL, RunState.RUNNING, RunState.CANCELLED],
RunState.WAITING_FOR_APPROVAL: [RunState.RUNNING, RunState.CANCELLED],
RunState.RUNNING: [RunState.ITERATING, RunState.EVALUATING, RunState.PAUSED,
RunState.SUCCEEDED, RunState.FAILED, RunState.CANCELLED, RunState.TIMED_OUT],
RunState.PAUSED: [RunState.RUNNING, RunState.CANCELLED],
RunState.ITERATING: [RunState.EVALUATING, RunState.RUNNING, RunState.FAILED, RunState.CANCELLED],
RunState.EVALUATING: [RunState.REFLECTING, RunState.SUCCEEDED, RunState.ITERATING],
RunState.REFLECTING: [RunState.ITERATING, RunState.SUCCEEDED, RunState.FAILED],
RunState.RECOVERING: [RunState.RUNNING, RunState.FAILED],
# Terminal states have no transitions
RunState.SUCCEEDED: [],
RunState.FAILED: [],
RunState.CANCELLED: [],
RunState.TIMED_OUT: [],
}
@dataclass
class RunContext:
"""Full context for a run."""
run_id: str
task: str
state: RunState
iteration: int
max_iterations: int
started_at: datetime
timeout_at: Optional[datetime]
model: str
config: FlywheelConfig
worktree_path: Optional[Path]
parent_run_id: Optional[str]
child_run_ids: list[str]
metadata: dict
@dataclass
class AttemptRecord:
"""Record of a single attempt."""
id: str
run_id: str
iteration: int
task: str
prompt: str
response: str
outcome: AttemptOutcome
model: str
tokens_used: int
duration_seconds: float
error: Optional[str]
timestamp: datetime
def to_memory_text(self) -> str:
"""Convert to text for memory storage."""
return f"[{self.outcome.name}] Task: {self.task[:100]}... Model: {self.model} | {self.error or 'Success'}"Status: Production-ready, ChatGPT had partial coverage (mentioned some states)
What ChatGPT Missed: Pluggable validation with composite validators
class ValidationResult(NamedTuple):
passed: bool
message: str
details: dict = {}
class Validator(Protocol):
"""Base validator protocol."""
def validate(self, context: ValidationContext) -> ValidationResult
@dataclass
class ValidationContext:
"""Context passed to validators."""
run_id: str
task: str
output: str
working_dir: Path
files_changed: list[Path]
metadata: dict
class TestsPassValidator(Validator):
"""Validate that tests pass."""
def __init__(self, test_command: str = "pytest"):
self.test_command = test_command
def validate(self, context: ValidationContext) -> ValidationResult:
result = subprocess.run(
self.test_command.split(),
cwd=context.working_dir,
capture_output=True,
text=True
)
return ValidationResult(
passed=result.returncode == 0,
message="Tests passed" if result.returncode == 0 else f"Tests failed: {result.stderr}",
details={"stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode}
)
class FileExistsValidator(Validator):
"""Validate that required files exist."""
def __init__(self, required_files: list[str]):
self.required_files = required_files
def validate(self, context: ValidationContext) -> ValidationResult:
missing = [f for f in self.required_files if not (context.working_dir / f).exists()]
return ValidationResult(
passed=len(missing) == 0,
message="All files exist" if not missing else f"Missing files: {missing}",
details={"missing": missing}
)
class OutputContainsValidator(Validator):
"""Validate that output contains expected patterns."""
class SyntaxValidator(Validator):
"""Validate syntax of generated code."""
class CompositeValidator(Validator):
"""Combine multiple validators."""
def __init__(self, validators: list[Validator], mode: str = "all"):
self.validators = validators
self.mode = mode # "all" or "any"
def validate(self, context: ValidationContext) -> ValidationResult:
results = [v.validate(context) for v in self.validators]
if self.mode == "all":
passed = all(r.passed for r in results)
else:
passed = any(r.passed for r in results)
return ValidationResult(
passed=passed,
message="; ".join(r.message for r in results),
details={"sub_results": [r._asdict() for r in results]}
)Status: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Complete multi-agent orchestration modes
class OrchestratorMode(Enum):
SINGLE_AGENT = "single_agent"
MULTI_AGENT = "multi_agent"
WORKFLOW = "workflow"
CONSENSUS = "consensus"
class AgentRole(Enum):
PLANNER = "planner"
IMPLEMENTER = "implementer"
REVIEWER = "reviewer"
TESTER = "tester"
COORDINATOR = "coordinator"
@dataclass
class AgentAssignment:
agent_id: str
role: AgentRole
task_ids: list[str]
model: str
constraints: ResourceConstraint
class Orchestrator:
"""Multi-agent task orchestrator."""
def __init__(
self,
mode: OrchestratorMode,
agents: list[Agent],
consensus_engine: Optional[ConsensusEngine] = None,
dag_executor: Optional[DAGExecutor] = None
):
self.mode = mode
self.agents = {a.id: a for a in agents}
self.consensus_engine = consensus_engine
self.dag_executor = dag_executor
async def run(self, tasks: list[Task]) -> OrchestratorResult:
"""Execute tasks according to orchestration mode."""
if self.mode == OrchestratorMode.SINGLE_AGENT:
return await self._run_single_agent(tasks)
elif self.mode == OrchestratorMode.MULTI_AGENT:
return await self._run_multi_agent(tasks)
elif self.mode == OrchestratorMode.WORKFLOW:
return await self._run_workflow(tasks)
elif self.mode == OrchestratorMode.CONSENSUS:
return await self._run_consensus(tasks)
async def _run_multi_agent(self, tasks: list[Task]) -> OrchestratorResult:
"""Distribute tasks across multiple agents."""
assignments = await self._assign_tasks(tasks)
results = await asyncio.gather(*[
self._execute_assignment(assignment)
for assignment in assignments
])
return self._aggregate_results(results)
async def _run_consensus(self, tasks: list[Task]) -> OrchestratorResult:
"""Run tasks with consensus voting on outputs."""
for task in tasks:
# Get proposals from multiple agents
proposals = await asyncio.gather(*[
agent.propose(task) for agent in self.agents.values()
])
# Vote on best proposal
winner = await self.consensus_engine.vote(proposals)
# Execute winning proposal
await self._execute_proposal(winner)
async def _assign_tasks(self, tasks: list[Task]) -> list[AgentAssignment]:
"""Assign tasks to agents based on capabilities and load."""Status: Production-ready, ChatGPT had partial coverage (mentioned consensus but missed full orchestrator)
What ChatGPT Missed: Full distributed tracing implementation
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
class RalphTracer:
"""OpenTelemetry tracer for distributed tracing."""
def __init__(self, service_name: str = "ralph", endpoint: str = None):
provider = TracerProvider()
if endpoint:
exporter = OTLPSpanExporter(endpoint=endpoint)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
self.tracer = trace.get_tracer(service_name)
self.propagator = TraceContextTextMapPropagator()
@contextmanager
def span(self, name: str, attributes: dict = None) -> trace.Span:
"""Create a span context."""
with self.tracer.start_as_current_span(name) as span:
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
yield span
def inject_context(self, carrier: dict) -> dict:
"""Inject trace context for propagation."""
self.propagator.inject(carrier)
return carrier
def extract_context(self, carrier: dict) -> trace.Context:
"""Extract trace context from propagated headers."""
return self.propagator.extract(carrier)
async def trace_run(self, run_id: str, task: str, func: Callable, *args, **kwargs):
"""Trace a full run."""
with self.span("run", {"run_id": run_id, "task": task[:100]}) as span:
try:
result = await func(*args, **kwargs)
span.set_attribute("status", "success")
return result
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error", str(e))
span.record_exception(e)
raise
async def trace_iteration(self, run_id: str, iteration: int, func: Callable, *args, **kwargs):
"""Trace a single iteration."""
with self.span("iteration", {"run_id": run_id, "iteration": iteration}):
return await func(*args, **kwargs)
async def trace_llm_call(self, model: str, tokens: int, func: Callable, *args, **kwargs):
"""Trace an LLM API call."""
with self.span("llm_call", {"model": model}) as span:
result = await func(*args, **kwargs)
span.set_attribute("tokens", tokens)
return resultStatus: Production-ready, ChatGPT had partial coverage (mentioned tracer but missed implementation)
What ChatGPT Missed: Full metrics implementation with histograms
from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, push_to_gateway
class RalphMetrics:
"""Prometheus metrics for Ralph operations."""
def __init__(self, registry: CollectorRegistry = None, pushgateway_url: str = None):
self.registry = registry or CollectorRegistry()
self.pushgateway_url = pushgateway_url
# Counters
self.runs_total = Counter(
"ralph_runs_total",
"Total number of runs",
["status", "model"],
registry=self.registry
)
self.iterations_total = Counter(
"ralph_iterations_total",
"Total number of iterations",
["run_id", "outcome"],
registry=self.registry
)
self.llm_calls_total = Counter(
"ralph_llm_calls_total",
"Total LLM API calls",
["model", "status"],
registry=self.registry
)
self.tokens_total = Counter(
"ralph_tokens_total",
"Total tokens used",
["model", "type"], # type: prompt, completion
registry=self.registry
)
# Gauges
self.active_runs = Gauge(
"ralph_active_runs",
"Currently active runs",
registry=self.registry
)
self.worktrees_in_use = Gauge(
"ralph_worktrees_in_use",
"Worktrees currently in use",
registry=self.registry
)
self.dlq_size = Gauge(
"ralph_dlq_size",
"Dead letter queue size",
registry=self.registry
)
# Histograms
self.run_duration_seconds = Histogram(
"ralph_run_duration_seconds",
"Run duration in seconds",
["status"],
buckets=[1, 5, 10, 30, 60, 120, 300, 600],
registry=self.registry
)
self.iteration_duration_seconds = Histogram(
"ralph_iteration_duration_seconds",
"Iteration duration in seconds",
buckets=[0.5, 1, 2, 5, 10, 30, 60],
registry=self.registry
)
self.llm_latency_seconds = Histogram(
"ralph_llm_latency_seconds",
"LLM API latency in seconds",
["model"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30],
registry=self.registry
)
def record_run_start(self, run_id: str, model: str):
"""Record run start."""
self.active_runs.inc()
def record_run_end(self, run_id: str, model: str, status: str, duration: float):
"""Record run completion."""
self.active_runs.dec()
self.runs_total.labels(status=status, model=model).inc()
self.run_duration_seconds.labels(status=status).observe(duration)
def record_iteration(self, run_id: str, outcome: str, duration: float):
"""Record iteration."""
self.iterations_total.labels(run_id=run_id, outcome=outcome).inc()
self.iteration_duration_seconds.observe(duration)
def record_llm_call(self, model: str, status: str, latency: float, prompt_tokens: int, completion_tokens: int):
"""Record LLM API call."""
self.llm_calls_total.labels(model=model, status=status).inc()
self.llm_latency_seconds.labels(model=model).observe(latency)
self.tokens_total.labels(model=model, type="prompt").inc(prompt_tokens)
self.tokens_total.labels(model=model, type="completion").inc(completion_tokens)
def push(self, job: str = "ralph"):
"""Push metrics to Pushgateway."""
if self.pushgateway_url:
push_to_gateway(self.pushgateway_url, job=job, registry=self.registry)Status: Production-ready, ChatGPT had partial coverage (mentioned metrics but missed implementation)
What ChatGPT Missed: Full retry with error classification and policy builder
class ErrorClass(Enum):
TRANSIENT = "transient" # Network errors, rate limits
PERMANENT = "permanent" # Invalid input, auth failures
UNKNOWN = "unknown" # Unclassified errors
class RetryStopReason(Enum):
SUCCESS = "success"
MAX_ATTEMPTS = "max_attempts"
PERMANENT_ERROR = "permanent_error"
TIMEOUT = "timeout"
CANCELLED = "cancelled"
@dataclass
class RetryPolicy:
max_attempts: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_exceptions: tuple = (Exception,)
retryable_status_codes: tuple = (429, 500, 502, 503, 504)
class RetryEngine:
"""Retry with exponential backoff and error classification."""
def __init__(self, policy: RetryPolicy):
self.policy = policy
async def execute(
self,
func: Callable,
*args,
error_classifier: Callable[[Exception], ErrorClass] = None,
**kwargs
) -> tuple[Any, RetryStopReason]:
"""Execute with retry."""
error_classifier = error_classifier or self._default_classifier
last_error = None
for attempt in range(self.policy.max_attempts):
try:
result = await func(*args, **kwargs)
return result, RetryStopReason.SUCCESS
except self.policy.retryable_exceptions as e:
last_error = e
error_class = error_classifier(e)
if error_class == ErrorClass.PERMANENT:
return None, RetryStopReason.PERMANENT_ERROR
if attempt < self.policy.max_attempts - 1:
delay = self._calculate_delay(attempt)
await asyncio.sleep(delay)
return None, RetryStopReason.MAX_ATTEMPTS
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay with exponential backoff and optional jitter."""
delay = min(
self.policy.initial_delay * (self.policy.exponential_base ** attempt),
self.policy.max_delay
)
if self.policy.jitter:
delay *= (0.5 + random.random())
return delay
@staticmethod
def _default_classifier(error: Exception) -> ErrorClass:
"""Default error classification."""
if isinstance(error, (TimeoutError, ConnectionError)):
return ErrorClass.TRANSIENT
if isinstance(error, (ValueError, TypeError)):
return ErrorClass.PERMANENT
return ErrorClass.UNKNOWN
class PolicyBuilder:
"""Fluent builder for retry policies."""
def __init__(self):
self._policy = RetryPolicy()
def max_attempts(self, n: int) -> 'PolicyBuilder':
self._policy.max_attempts = n
return self
def initial_delay(self, seconds: float) -> 'PolicyBuilder':
self._policy.initial_delay = seconds
return self
def max_delay(self, seconds: float) -> 'PolicyBuilder':
self._policy.max_delay = seconds
return self
def exponential_base(self, base: float) -> 'PolicyBuilder':
self._policy.exponential_base = base
return self
def with_jitter(self, enabled: bool = True) -> 'PolicyBuilder':
self._policy.jitter = enabled
return self
def retry_on(self, *exceptions: type) -> 'PolicyBuilder':
self._policy.retryable_exceptions = exceptions
return self
def build(self) -> RetryPolicy:
return self._policyStatus: Production-ready, ChatGPT had 0% coverage
What ChatGPT Missed: Capability-based agent discovery and routing
class SkillLevel(Enum):
NOVICE = 1
INTERMEDIATE = 2
ADVANCED = 3
EXPERT = 4
@dataclass
class Skill:
name: str
level: SkillLevel
task_types: list[str]
keywords: list[str]
@dataclass
class AgentCapability:
"""Full capability description for an agent."""
agent_id: str
name: str
skills: list[Skill]
task_types: list[str]
languages: list[str]
frameworks: list[str]
resources: ResourceConstraint
preferred_models: list[str]
max_concurrent_tasks: int
tags: list[str]
class RoutingPolicy(Enum):
BEST_MATCH = "best_match"
LEAST_LOADED = "least_loaded"
ROUND_ROBIN = "round_robin"
RANDOM = "random"
AFFINITY = "affinity"
class AgentRegistry:
"""Registry for agent discovery and capability matching."""
def __init__(self):
self.agents: dict[str, AgentCapability] = {}
self.agent_loads: dict[str, int] = {}
self._round_robin_index = 0
def register(self, capability: AgentCapability):
"""Register an agent's capabilities."""
self.agents[capability.agent_id] = capability
self.agent_loads[capability.agent_id] = 0
def unregister(self, agent_id: str):
"""Remove an agent from registry."""
self.agents.pop(agent_id, None)
self.agent_loads.pop(agent_id, None)
def find_by_skill(self, skill_name: str, min_level: SkillLevel = SkillLevel.NOVICE) -> list[AgentCapability]:
"""Find agents with a specific skill at minimum level."""
return [
agent for agent in self.agents.values()
if any(s.name == skill_name and s.level.value >= min_level.value for s in agent.skills)
]
def find_by_task_type(self, task_type: str) -> list[AgentCapability]:
"""Find agents that can handle a task type."""
return [
agent for agent in self.agents.values()
if task_type in agent.task_types
]
def find_by_tags(self, tags: list[str]) -> list[AgentCapability]:
"""Find agents matching all specified tags."""
return [
agent for agent in self.agents.values()
if all(tag in agent.tags for tag in tags)
]
class TaskRouter:
"""Routes tasks to appropriate agents."""
def __init__(self, registry: AgentRegistry, policy: RoutingPolicy = RoutingPolicy.BEST_MATCH):
self.registry = registry
self.policy = policy
async def route(self, task: Task) -> Optional[str]:
"""Route a task to an agent."""
candidates = self._find_candidates(task)
if not candidates:
return None
if self.policy == RoutingPolicy.BEST_MATCH:
return self._select_best_match(task, candidates)
elif self.policy == RoutingPolicy.LEAST_LOADED:
return self._select_least_loaded(candidates)
elif self.policy == RoutingPolicy.ROUND_ROBIN:
return self._select_round_robin(candidates)
elif self.policy == RoutingPolicy.RANDOM:
return random.choice(candidates).agent_id
elif self.policy == RoutingPolicy.AFFINITY:
return self._select_affinity(task, candidates)
def _find_candidates(self, task: Task) -> list[AgentCapability]:
"""Find all agents capable of handling the task."""
candidates = []
for agent in self.registry.agents.values():
if self._can_handle(agent, task):
candidates.append(agent)
return candidates
def _can_handle(self, agent: AgentCapability, task: Task) -> bool:
"""Check if agent can handle task."""
# Check task type
if task.type and task.type not in agent.task_types:
return False
# Check load
if self.registry.agent_loads.get(agent.agent_id, 0) >= agent.max_concurrent_tasks:
return False
# Check resources
if task.resources and not self._resources_satisfied(agent.resources, task.resources):
return False
return True
def _select_best_match(self, task: Task, candidates: list[AgentCapability]) -> str:
"""Select agent with best skill match."""
scores = []
for agent in candidates:
score = self._calculate_match_score(task, agent)
scores.append((agent.agent_id, score))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[0][0]
def _calculate_match_score(self, task: Task, agent: AgentCapability) -> float:
"""Calculate how well agent matches task."""
score = 0.0
# Skill level bonus
for skill in agent.skills:
if any(kw in task.description.lower() for kw in skill.keywords):
score += skill.level.value * 0.25
# Preferred model bonus
if task.preferred_model in agent.preferred_models:
score += 1.0
# Load penalty
load_factor = self.registry.agent_loads.get(agent.agent_id, 0) / agent.max_concurrent_tasks
score *= (1 - load_factor * 0.5)
return scoreStatus: Production-ready, ChatGPT had 0% coverage
| Component | ChatGPT Coverage | Actual Status |
|---|---|---|
| 12-Layer Architecture | ✅ Complete | Production-ready |
| EnterpriseFlywheel Core | ✅ Complete | Production-ready |
| Beads Event Store | ✅ Complete | Production-ready |
| RecoveryManager | ✅ Complete | Production-ready |
| DeadLetterQueue | ✅ Complete | Production-ready |
| WorktreePool | ✅ Complete | Production-ready |
| SafetyGuard | ✅ Complete | Production-ready |
| CostTracker | ✅ Complete | Production-ready |
| Consensus Engine | ✅ Complete | Production-ready |
| LLMRouter | ✅ Complete | Production-ready |
| Company Operations | ❌ Missing | Production-ready |
| Cancellation Tokens | ❌ Missing | Production-ready |
| Resource Scheduler | ❌ Missing | Production-ready |
| Agent Mail Protocol | ❌ Missing | Production-ready |
| Git Checkpoint Manager | ❌ Missing | Production-ready |
| Cloud Storage Backends | ❌ Missing | Production-ready |
| Artifact Store | ❌ Missing | Production-ready |
| Semantic Memory (embeddings) | 🟡 Partial | Production-ready |
| Design Patterns | ❌ Missing | Production-ready |
| Letta 0.16+ API Details | 🟡 Partial | Production-ready |
| Reflexion Loop (full flow) | 🟡 Partial | Production-ready |
| Full State Machine | 🟡 Partial | Production-ready |
| Validator Framework | ❌ Missing | Production-ready |
| Full Orchestrator Modes | 🟡 Partial | Production-ready |
| OpenTelemetry Implementation | 🟡 Partial | Production-ready |
| Prometheus Implementation | 🟡 Partial | Production-ready |
| Retry Engine | ❌ Missing | Production-ready |
| Agent Registry + TaskRouter | ❌ Missing | Production-ready |
ChatGPT's BLACKICE-SPEC-2.0 captured the architectural vision correctly but missed 10 complete production-ready systems and had only partial coverage on 8 others. The codebase is significantly more mature than the spec suggested, with full implementations of:
- Operational Infrastructure: Company operations, deployment automation, project scaffolding
- Execution Control: Cancellation tokens, resource scheduling, retry policies
- Communication: Inter-agent mail protocol with delivery guarantees
- Persistence: Git checkpointing, cloud storage, artifact management
- Intelligence: Semantic memory with embeddings, reflexion learning loop
- Code Quality: Design patterns, validation chains, composite validators
- Observability: Full OpenTelemetry + Prometheus implementations
- Coordination: Agent registry, capability matching, task routing
The true BLACKICE system is enterprise-grade, with 186KB of core orchestration code alone.
Generated through code archaeology by Claude Opus 4.5 Source: /Users/speed/proxmox/blackice/
Original gist: f92f5648c958c604c514f26d3ad4f1fd
BLACKICE 2.0 Use Cases: Regulated code gen, CI/CD, cost tracking, compliance audits
When to use BLACKICE 2.0: Auditable, validated, reproducible AI code generation for enterprise
Problem: Hospital needs AI to generate HIPAA-compliant API endpoints
WITHOUT BLACKICE 2.0:
├── Task: "Generate patient data API"
├── LLM generates code...
├── Maybe it's compliant? Maybe not?
├── No audit trail
└── Compliance officer: "Prove this is safe" ← You can't
WITH BLACKICE 2.0:
├── Spec validated via SHACL (required fields: auth, encryption, logging)
├── SPARQL checks: "Does output contain PHI handling?"
├── blake3 receipt: spec_hash → output_hash (immutable proof)
├── Receipt store: "Task X at time Y produced code Z with model A"
└── Compliance officer: "Show me the audit trail" ← Here's the receipt chain
Key Features Used:
- SHACL validation with healthcare-specific shapes
- Receipt store for SOC2/HIPAA compliance
- Cryptographic hash chain for audit integrity
Problem: 50 developers using AI agents, need quality gates
┌─────────────────────────────────────────────────────────────┐
│ ENTERPRISE CODE FACTORY │
├─────────────────────────────────────────────────────────────┤
│ │
│ Developer submits task │
│ ↓ │
│ SHACL Validation │
│ ├── "Missing target language" → REJECTED (save tokens!) │
│ ├── "Token budget too high" → REJECTED (save money!) │
│ └── "Dependencies unmet" → BLOCKED (prevent failures!) │
│ ↓ │
│ SPARQL Query: Find ready tasks in dependency order │
│ ↓ │
│ BLACKICE executes with Reflexion (self-improving) │
│ ↓ │
│ Receipt generated → Manager dashboard shows: │
│ ├── Tasks completed: 847 │
│ ├── Tokens spent: $2,341 │
│ ├── Success rate: 94.2% │
│ └── Audit-ready: ✓ │
│ │
└─────────────────────────────────────────────────────────────┘
Key Features Used:
- Pre-execution SHACL validation (reject bad tasks before spending tokens)
- SPARQL dependency queries
- Receipt-based metrics dashboard
Problem: Automated PR generation needs guardrails
# Without spec validation - bad things happen:
task = "refactor auth module"
# LLM deletes security checks, introduces SQL injection
# No record of what happened or why
# With BLACKICE 2.0:
spec = TaskSpecBuilder()
.task("refactor-auth-001")
.of_type("refactor")
.description("Refactor auth module for readability")
.priority(2)
.token_budget(50000)
.constraints({
"preserve_patterns": ["bcrypt", "jwt_verify", "rate_limit"],
"forbidden_patterns": ["eval(", "exec(", "raw SQL"],
"require_tests": True
})
.build()
# SHACL validates constraints exist
# Reflexion loop checks output against constraints
# Receipt proves: "spec required bcrypt preservation, output contains bcrypt"SHACL Shape for Security Constraints:
task:RefactorTaskShape a sh:NodeShape ;
sh:targetClass task:RefactorTask ;
sh:property [
sh:path task:preservePatterns ;
sh:minCount 1 ;
sh:message "Refactor tasks MUST specify patterns to preserve"
] ;
sh:property [
sh:path task:forbiddenPatterns ;
sh:minCount 1 ;
sh:message "Refactor tasks MUST specify forbidden patterns"
] .Key Features Used:
- TaskSpecBuilder for type-safe task creation
- SHACL security constraints
- Reflexion validates output against constraints
Problem: "Our AI generated this code 6 months ago, can we regenerate it?"
WITHOUT receipts:
├── Which model version?
├── Which prompt?
├── Which parameters?
└── Answer: "We don't know" ← Research not reproducible
WITH BLACKICE 2.0 receipts:
{
"receipt_id": "a1b2c3d4",
"spec_hash": "e5f6g7h8", ← Exact spec used
"input_hash": "i9j0k1l2", ← Exact input
"output_hash": "m3n4o5p6", ← Exact output
"model_used": "claude-sonnet-4-20250514",
"tokens_used": 12847,
"timestamp": "2025-07-15T14:30:00Z",
"parent_receipt_id": null ← First attempt
}
# Re-run with same spec_hash → deterministic scaffold
# Reflexion may improve, but base is reproducible
Verification Query:
# Verify output hasn't been tampered with
receipt = receipt_store.get("a1b2c3d4")
current_hash = blake3(current_output).hexdigest()
if current_hash == receipt.output_hash:
print("✓ Output verified - matches original generation")
else:
print("✗ Output modified since generation!")Key Features Used:
- blake3 cryptographic hashing
- Receipt chain for full provenance
- Spec hash for reproducibility
Problem: "Which team is burning all our API credits?"
-- Query receipt store for cost attribution
SELECT
SUBSTR(task_id, 1, INSTR(task_id, '-') - 1) as team,
SUM(tokens_used) as total_tokens,
COUNT(*) as task_count,
SUM(tokens_used) * 0.00002 as cost_usd
FROM receipts
WHERE timestamp > '2025-01-01'
GROUP BY team
ORDER BY total_tokens DESC;Result:
| Team | Tokens | Tasks | Cost |
|---|---|---|---|
| team-ml | 5.2M | 423 | $104 |
| team-frontend | 2.4M | 892 | $48 |
| team-backend | 1.1M | 341 | $22 |
| team-infra | 800K | 156 | $16 |
Budget Enforcement via SHACL:
task:BudgetShape a sh:NodeShape ;
sh:targetClass task:Task ;
sh:property [
sh:path task:maxTokenBudget ;
sh:maxInclusive 100000 ;
sh:message "Token budget exceeds team limit of 100K"
] .Key Features Used:
- Receipt store SQL queries
- SHACL budget constraints
- Per-task cost tracking
Problem: Tasks have dependencies, need execution order
Task Specification (RDF):
@prefix task: <http://blackice.dev/ontology/task#> .
tasks:generate-models a task:CodeGenTask ;
task:hasDescription "Generate SQLAlchemy models from schema" ;
task:hasPriority 0 ;
task:targetLanguage "python" ;
task:maxTokenBudget 30000 .
tasks:generate-api a task:CodeGenTask ;
task:hasDescription "Generate FastAPI routes" ;
task:hasPriority 1 ;
task:dependsOn tasks:generate-models . # ← Must wait
tasks:generate-tests a task:TestTask ;
task:hasDescription "Generate pytest tests for API" ;
task:hasPriority 2 ;
task:dependsOn tasks:generate-api . # ← Must wait
tasks:generate-docs a task:CodeGenTask ;
task:hasDescription "Generate OpenAPI documentation" ;
task:hasPriority 3 ;
task:dependsOn tasks:generate-api . # ← Can run parallel with testsSPARQL Query: Find Ready Tasks:
PREFIX task: <http://blackice.dev/ontology/task#>
SELECT ?task ?description ?priority
WHERE {
?task a task:Task ;
task:hasDescription ?description ;
task:hasPriority ?priority ;
task:status "pending" .
# No incomplete dependencies
FILTER NOT EXISTS {
?task task:dependsOn ?dep .
?dep task:status ?depStatus .
FILTER(?depStatus != "completed")
}
}
ORDER BY ?priorityExecution Flow:
Time 0: Ready = [generate-models]
Execute generate-models...
Time 1: Ready = [generate-api] (models completed)
Execute generate-api...
Time 2: Ready = [generate-tests, generate-docs] (api completed)
Execute BOTH in parallel via DAGExecutor...
Time 3: All complete ✓
Key Features Used:
- RDF task specifications with dependencies
- SPARQL ready-task queries
- DAGExecutor for parallel execution
Problem: Task failed after 10 retries, why?
# Query receipt chain for failed task
receipts = receipt_store.get_by_task("task-xyz")
print("=== FAILURE FORENSICS ===
")
for i, r in enumerate(receipts, 1):
print(f"""
Attempt {i}:
Receipt: {r.receipt_id}
Model: {r.model_used}
Tokens: {r.tokens_used:,}
Time: {r.time_elapsed_ms}ms
Status: {r.status}
Parent: {r.parent_receipt_id or 'None (first attempt)'}
""")Output:
=== FAILURE FORENSICS ===
Attempt 1:
Receipt: a1b2c3d4
Model: claude-sonnet-4-20250514
Tokens: 15,234
Time: 4,521ms
Status: failed
Parent: None (first attempt)
Attempt 2:
Receipt: e5f6g7h8
Model: claude-sonnet-4-20250514
Tokens: 18,109
Time: 5,892ms
Status: failed
Parent: a1b2c3d4
Attempt 3:
Receipt: i9j0k1l2
Model: gpt-4o ← LLMRouter tried different model
Tokens: 22,847
Time: 8,234ms
Status: failed
Parent: e5f6g7h8
...
Attempt 10:
Receipt: q5r6s7t8
Model: claude-opus-4-20250514 ← Escalated to most capable
Tokens: 45,123
Time: 15,234ms
Status: failed
Parent: m1n2o3p4
DIAGNOSIS: All models failed → Spec likely impossible
ACTION: Review spec constraints, check SHACL validation
Root Cause Query:
-- Find tasks with high failure rates
SELECT
task_id,
COUNT(*) as attempts,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failures,
SUM(tokens_used) as wasted_tokens
FROM receipts
GROUP BY task_id
HAVING failures > 3
ORDER BY wasted_tokens DESC;Key Features Used:
- Receipt chain with parent_receipt_id
- Failure forensics queries
- Token waste analysis
Problem: SOC2 auditor needs evidence of AI code generation controls
# Export audit log for date range
audit_log = receipt_store.export_audit_log(
start_date="2025-01-01",
end_date="2025-03-31"
)
# Save for auditor
with open("Q1_2025_audit_log.json", "w") as f:
f.write(audit_log)Audit Log Format:
{
"export_timestamp": "2025-04-01T09:00:00Z",
"receipt_count": 12847,
"receipts": [
{
"receipt_id": "a1b2c3d4",
"task_id": "api-gen-001",
"spec_hash": "e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0",
"input_hash": "u1v2w3x4y5z6a7b8c9d0e1f2g3h4i5j6",
"output_hash": "k7l8m9n0o1p2q3r4s5t6u7v8w9x0y1z2",
"model_used": "claude-sonnet-4-20250514",
"tokens_used": 15234,
"time_elapsed_ms": 4521,
"status": "success",
"timestamp": "2025-01-15T14:30:00Z"
},
...
]
}Auditor Questions Answered:
| Question | Answer (from receipts) |
|---|---|
| "What AI models were used?" | Unique models in model_used field |
| "How much was spent?" | Sum of tokens_used × rate |
| "Were outputs validated?" | SHACL validation in spec layer |
| "Can you reproduce outputs?" | Yes, via spec_hash |
| "Is there an audit trail?" | Yes, receipt chain with hashes |
Key Features Used:
- Receipt store export
- Cryptographic integrity verification
- Compliance-ready JSON format
| BLACKICE 2.0 Feature | Primary Use Case |
|---|---|
| SHACL Validation | Quality gates, budget enforcement |
| SPARQL Queries | Dependency scheduling, ready tasks |
| Receipt Store | Audit compliance, cost attribution |
| blake3 Hashing | Reproducibility, integrity verification |
| TaskSpecBuilder | Type-safe task creation |
| Receipt Chains | Failure forensics, retry tracking |
| Audit Export | SOC2/HIPAA/GDPR compliance |
Need auditable AI code generation? → Receipt Store
Need pre-execution validation? → SHACL Shapes
Need dependency-aware scheduling? → SPARQL Queries
Need reproducible outputs? → blake3 Hashing
Need cost tracking? → Receipt Queries
Need failure debugging? → Receipt Chains
Need compliance evidence? → Audit Export
BLACKICE 2.0: Enterprise-grade AI code generation with full auditability
Original gist: d6e9b931fb39ce73d7da3545061bcc28
BLACKICE Complete System Context Drop - 54K+ lines, 72 features, 19 sources consolidated
Version: 2.0 (EnterpriseFlywheel) Generated: 2026-01-07 Sources: 19 analyzed projects + existing codebase (54,390 lines) Purpose: Full context for continuing BLACKICE development
- System Overview
- Architecture Layers
- Core Components
- EnterpriseFlywheel (Unified Orchestrator)
- Beads Event Store
- Ultimate Features Roadmap
- Conflict Resolutions
- Implementation Sketches
- Infrastructure
- Quick Start
BLACKICE is an autonomous multi-agent AI coding framework that orchestrates planning, implementation, QA, and deployment without continuous human intervention.
┌─────────────────────────────────────────────────────────────────────────┐
│ ITERATE UNTIL SUCCESS │
│ │
│ Task → Route → Execute → Evaluate → Learn → Retry (if needed) │
│ │
│ All state persisted in Beads. All decisions auditable. │
│ All failures recoverable. All agents coordinated. │
└─────────────────────────────────────────────────────────────────────────┘
| Metric | Value |
|---|---|
| Total Lines of Code | 54,390+ |
| Architecture Layers | 12 |
| Event Types | 40+ |
| Consensus Strategies | 6 |
| LLM Adapters | 5 |
| Worker Pool Size | 4 (configurable) |
┌─────────────────────────────────────────────────────────────────────────┐
│ L12: CLI Interface │
│ Commands: blackice run, blackice doctor, blackice recover │
├─────────────────────────────────────────────────────────────────────────┤
│ L11: Orchestrator │
│ AgentRegistry, Supervisor, MessageBroker, ConsensusEngine │
├─────────────────────────────────────────────────────────────────────────┤
│ L10: EnterpriseFlywheel │
│ Unified integration of all capabilities (186KB) │
├─────────────────────────────────────────────────────────────────────────┤
│ L9: Reflexion Loop │
│ Multi-dimensional quality scoring, prompt refinement │
├─────────────────────────────────────────────────────────────────────────┤
│ L8: Recovery Layer │
│ RecoveryManager, DeadLetterQueue, WorktreePool │
├─────────────────────────────────────────────────────────────────────────┤
│ L7: Persistence Layer │
│ Beads Event Store, Snapshots, Artifact Store │
├─────────────────────────────────────────────────────────────────────────┤
│ L6: Instrumentation │
│ SafetyGuard, CostTracker, LoopFingerprint, Metrics, Tracer │
├─────────────────────────────────────────────────────────────────────────┤
│ L5: Service Colony │
│ Worker management, task distribution, result aggregation │
├─────────────────────────────────────────────────────────────────────────┤
│ L4: Core Loop │
│ DAGExecutor, WorkflowDAG, parallel execution │
├─────────────────────────────────────────────────────────────────────────┤
│ L3: Adapters │
│ OllamaAdapter, LettaAdapter, ClaudeProxyAdapter, CodexAdapter │
├─────────────────────────────────────────────────────────────────────────┤
│ L2: Dispatcher │
│ Backend routing (ai-factory, speckit, LLM) │
├─────────────────────────────────────────────────────────────────────────┤
│ L1: Infrastructure │
│ Ollama (11434), Letta (8283), PostgreSQL (5432), LiteLLM (4000) │
└─────────────────────────────────────────────────────────────────────────┘
The unified orchestrator integrating ALL capabilities:
class EnterpriseFlywheel:
"""186KB unified orchestrator - the heart of BLACKICE."""
components = {
# Phase 1: Foundation
"LLMRouter": "Intelligent model selection",
"DAGExecutor": "Parallel workflow execution",
"WorktreePool": "Git worktree isolation per task",
"RecoveryManager": "Crash recovery from Beads events",
"DeadLetterQueue": "Failed task handling with retry",
"SafetyGuard": "Policy enforcement, loop detection",
"CostTracker": "Token/time budget management",
"LettaAdapter": "Persistent memory across sessions",
"Dispatcher": "Backend routing",
# Phase 2: Intelligence
"ReflexionLoop": "Multi-dimensional quality scoring",
"LoopFingerprint": "Advanced behavioral loop detection",
"RalphMetrics": "Prometheus metrics export",
"RalphTracer": "OpenTelemetry distributed tracing",
"SmartRouter": "Capability-based routing",
# Phase 5: Operations
"CompanyOperations": "GitHub, deployment, scaffolding",
"MonitoringFeedback": "Production metrics feedback",
"TestRunner": "Automated test execution",
# Phase 6: Adapters
"AdapterChain": "Unified LLM execution",
"SemanticMemory": "Embedding-based continual learning",
}Append-only SQLite event log with 40+ event types:
class EventType(Enum):
# Run lifecycle (8 events)
RUN_STARTED = "run_started"
RUN_STATE_TRANSITION = "run_state_transition"
RUN_COMPLETED = "run_completed"
RUN_FAILED = "run_failed"
RUN_ABORTED = "run_aborted"
RUN_PAUSED = "run_paused"
RUN_RESUMING = "run_resuming"
RUN_CANCELLED = "run_cancelled"
# Task lifecycle (7 events)
TASK_QUEUED = "task_queued"
TASK_STARTED = "task_started"
TASK_PROGRESS = "task_progress"
TASK_SUCCEEDED = "task_succeeded"
TASK_FAILED = "task_failed"
TASK_CANCELLED = "task_cancelled"
TASK_RETRY = "task_retry"
# Worktree management (7 events)
WORKTREE_CREATED = "worktree_created"
WORKTREE_ACQUIRED = "worktree_acquired"
WORKTREE_RELEASED = "worktree_released"
WORKTREE_MERGED = "worktree_merged"
WORKTREE_DISCARDED = "worktree_discarded"
WORKTREE_FAILED = "worktree_failed"
WORKTREE_ORPHAN_CLEANED = "worktree_orphan_cleaned"
# Recovery (4 events)
RECOVERY_STARTED = "recovery_started"
RECOVERY_PLAN_BUILT = "recovery_plan_built"
RECOVERY_COMPLETED = "recovery_completed"
RECOVERY_FAILED = "recovery_failed"
# Dead Letter Queue (4 events)
DLQ_ENQUEUED = "dlq_enqueued"
DLQ_RETRIED = "dlq_retried"
DLQ_DISCARDED = "dlq_discarded"
DLQ_EXPIRED = "dlq_expired"
# ... 10+ more6 voting strategies for multi-agent coordination:
class ConsensusStrategy(Enum):
MAJORITY = "majority" # >50% agreement
SUPERMAJORITY = "supermajority" # >66% agreement
UNANIMOUS = "unanimous" # 100% agreement
QUORUM = "quorum" # Minimum voters required
FIRST_N = "first_n" # First N agreeing votes
WEIGHTED = "weighted" # Reputation-weighted votingUnified LLM execution with fallback:
class AdapterChain:
"""Routes through adapters based on model and availability."""
priority_map = {
"claude": ["claude_proxy", "letta", "ollama"],
"gpt": ["letta", "ollama"],
"local": ["ollama", "letta", "claude_proxy"],
}
model_remap = {
"claude-3-sonnet": "llama3.2:3b",
"claude-3-opus": "llama3.2:3b",
"gpt-4": "llama3.2:3b",
}Policy enforcement with checkpoints:
class Checkpoint(Enum):
START_OF_RUN = "start_of_run"
BEFORE_ITERATION = "before_iteration"
AFTER_TOOL_CALL = "after_tool_call"
BEFORE_RETRY = "before_retry"
END_OF_RUN = "end_of_run"
class SafetyAction(Enum):
ALLOW = "allow"
ABORT = "abort"
MITIGATE = "mitigate"
ESCALATE = "escalate"@dataclass
class EnterpriseFlywheelConfig:
# Safety limits
max_iterations: int = 10
loop_detection_threshold: int = 3
# Cost limits
max_tokens_per_task: int = 100_000
max_time_per_task_seconds: int = 600
# Model routing
default_model: str = "claude-sonnet-4-20250514"
vision_model: str = "gpt-4o"
simple_model: str = "ollama/qwen2.5-coder"
# Infrastructure
beads_db_path: Path = Path("~/.beads/beads.db")
worktree_base: Path = Path("/tmp/ralph-worktrees")
worker_pool_size: int = 4
# Dead Letter Queue
dlq_max_retries: int = 3
dlq_expiry_hours: int = 24
# Observability
metrics_enabled: bool = True
tracing_enabled: bool = True
structured_logging: bool = True┌─────────────────────────────────────────────────────────────────────────┐
│ EnterpriseFlywheel.run() │
├─────────────────────────────────────────────────────────────────────────┤
│ 1. SafetyGuard.evaluate(START_OF_RUN) │
│ └── Check policies, verify not loop │
│ │
│ 2. WorktreePool.acquire(task_id) │
│ └── Get isolated git worktree for task │
│ │
│ 3. For iteration in range(max_iterations): │
│ ├── SafetyGuard.evaluate(BEFORE_ITERATION) │
│ ├── CostTracker.can_continue(task_id) │
│ ├── LLMRouter.select_model(task) │
│ ├── AdapterChain.execute(prompt, model) │
│ ├── SafetyGuard.evaluate(AFTER_TOOL_CALL) │
│ ├── ReflexionLoop.evaluate(result) │
│ ├── PatternLearner.record(task, result) │
│ └── If success: break │
│ │
│ 4. WorktreePool.release(worktree) │
│ │
│ 5. If failed: DeadLetterQueue.enqueue(task, reason) │
│ │
│ 6. Beads.append(RUN_COMPLETED or RUN_FAILED) │
└─────────────────────────────────────────────────────────────────────────┘
CREATE TABLE events (
record_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
event_type TEXT NOT NULL,
data TEXT NOT NULL,
run_id TEXT,
iteration_id INTEGER,
task_id TEXT,
mail_id TEXT,
schema_version INTEGER NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE snapshots (
snapshot_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
state_data TEXT NOT NULL,
last_record_id TEXT NOT NULL,
schema_version INTEGER NOT NULL
);
-- Indexes for fast queries
CREATE INDEX idx_events_run_id ON events(run_id);
CREATE INDEX idx_events_task_id ON events(task_id);
CREATE INDEX idx_events_timestamp ON events(timestamp);async def recover(self) -> RecoveryPlan:
"""Recover from crash using Beads event replay."""
# 1. Find latest snapshot
snapshot = await self.beads.get_latest_snapshot(run_id)
# 2. Replay events since snapshot
events = await self.beads.get_events_since(snapshot.last_record_id)
# 3. Rebuild state
state = self.recovery_manager.rebuild_state(snapshot, events)
# 4. Categorize tasks
plan = RecoveryPlan(
completed_tasks=[t for t in state.tasks if t.status == "completed"],
pending_tasks=[t for t in state.tasks if t.status == "pending"],
failed_tasks=[t for t in state.tasks if t.status == "failed"],
)
return plan| Metric | Value |
|---|---|
| Total Features | 72 |
| Conflicts Resolved | 7 major areas |
| Phases | 4 |
| Timeline | 8-12 weeks |
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 1.1 | Provider Registry Pattern | ClaudeBar | Low | High |
| 1.2 | Completion Marker Detection | Ralph Orchestrator | Low | High |
| 1.3 | Security Masking in Logs | Ralph Orchestrator | Low | High |
| 1.4 | Fail-Safe Defaults | Safety-Net | Low | High |
| 1.5 | blackice doctor Command |
ACFS | Low | High |
| 1.6 | Status Notifications | Superset | Low | High |
| 1.7 | Per-Project Configuration | Superset | Low | High |
| 1.8 | Continuation Enforcement | Oh-My-OpenCode | Low | High |
| 1.9 | Conditional Execution | Petit | Low | High |
| 1.10 | Concurrency Limits | Petit | Low | High |
| 1.11 | Multi-Step Command Chains | Claude-Workflow | Low | High |
| 1.12 | Forced Attention Recovery | Planning-with-Files | Low | High |
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 2.1 | Dynamic Command Allowlisting | Auto-Claude | Medium | High |
| 2.2 | Semantic Command Analysis | Safety-Net | Medium | High |
| 2.3 | Shell Wrapper Detection | Safety-Net | Low-Med | High |
| 2.4 | Git Hook Integration | Guardian-Angel | Low | High |
| 2.5 | Content-Addressable Caching | Guardian-Angel | Low | High |
| 2.6 | Self-Validating QA Loop | Auto-Claude | Medium | High |
| 2.7 | Letter Grade Evaluation | Wayfound | Medium | High |
| 2.8 | Confidence Scoring | Quint-Code | Medium | High |
| 2.9 | Pre-Execution Guidelines | Wayfound | Low-Med | High |
| 2.10 | Three-Layer Security Sandbox | Auto-Claude | Medium | Medium |
| 2.11 | Adaptive Permission Framework | Ralph Orchestrator | Medium | Medium |
| 2.12 | Strict Mode for CI | Guardian-Angel | Low | Medium |
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 3.1 | Q-Cycle Structured Reasoning | Quint-Code | Med-High | High |
| 3.2 | Resource Quota Monitoring | ClaudeBar | Medium | High |
| 3.3 | Continuity Ledger | Continuous-Claude | Medium | High |
| 3.4 | Handoff System | Continuous-Claude | Medium | High |
| 3.5 | Role-Based Model Assignment | Oh-My-OpenCode | Low | High |
| 3.6 | Proactive Agent Spawning | Claude-Workflow | Medium | High |
| 3.7 | Background Task Extraction | Acontext | Medium | High |
| 3.8 | Structured Feedback Format | Plannotator | Medium | High |
| 3.9 | Memory Persistence | Auto-Claude | Medium | Medium |
| 3.10 | Artifact Index (FTS5) | Continuous-Claude | Medium | Medium |
| 3.11 | SOP Generation | Acontext | Medium | Medium |
| 3.12 | Decision Documents | Quint-Code | Medium | Medium |
| 3.13 | Common Pitfall Analysis | Wayfound | Medium | Medium |
| 3.14 | Cascading Verification | Claude-Workflow | Medium | Medium |
| 3.15 | Validation Funnel | Continuous-Claude | Med-High | Medium |
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 4.1 | Convoys (Work Bundling) | Gas Town | Low | High |
| 4.2 | OpenAI-Compatible API | MassGen | Low | High |
| 4.3 | Live Progress Visualization | MassGen | Low | High |
| 4.4 | Manifest-Driven Agent Registry | ACFS | Medium | High |
| 4.5 | GUPP (Propulsion Principle) | Gas Town | Medium | Medium |
| 4.6 | Patrol Agents (Self-Healing) | Gas Town | Medium | Medium |
| 4.7 | Cross-Model Attack Pattern | MassGen | Medium | Medium |
| 4.8 | Knowledge Sharing | MassGen | Low-Med | Medium |
| 4.9 | Background Agent Delegation | Oh-My-OpenCode | Medium | Medium |
| 4.10 | Cross-Job Dependencies | Petit | Medium | Medium |
| 4.11 | Async Human-in-the-Loop | Plannotator | Medium | Medium |
| 4.12 | Built-in Diff Viewer | Superset | Medium | Medium |
| 4.13 | 3-File State Pattern | Planning-with-Files | Low | Medium |
| 4.14 | Session Health Monitoring | Acontext | Medium | Medium |
| 4.15 | Protocol-Based DI | ClaudeBar | Medium | Medium |
Sources: Beads, Continuity Ledger, 3-File Pattern, Scratchpad
┌─────────────────────────────────────────────────┐
│ L3: Continuity Ledger (session snapshots) │ ← NEW
├─────────────────────────────────────────────────┤
│ L2: Task Workspace (3-file pattern per task) │ ← NEW
├─────────────────────────────────────────────────┤
│ L1: Agent Scratchpad (per-agent notes) │ ← NEW
├─────────────────────────────────────────────────┤
│ L0: Beads Event Store (immutable events) │ ← KEEP
└─────────────────────────────────────────────────┘
Sources: Binary pass/fail, Letter Grades, Confidence Scores
@dataclass
class QualityScore:
raw: float # 0-100 internal score
letter: str # A/B/C/D/F display grade
confidence: float # 0-1 decision confidence
breakdown: dict # Per-dimension scores
# Conversions:
# A = 90-100, B = 80-89, C = 70-79, D = 60-69, F = 0-59
# Confidence = raw / 100Sources: Beads, Letta, Insights DB, SOP Store, Evidence Decay
┌────────────────────────────────────────────────────┐
│ L3: SOP Store │
│ Generated procedures from success patterns │
├────────────────────────────────────────────────────┤
│ L2: Insights DB (SQLite) │
│ CodebaseInsight records with decay timestamps │
├────────────────────────────────────────────────────┤
│ L1: Letta Semantic Memory │
│ Embeddings for cross-session learning │
├────────────────────────────────────────────────────┤
│ L0: Beads Event Store │
│ Immutable append-only event log │
└────────────────────────────────────────────────────┘
Sources: SafetyGuard, Dynamic Allowlist, Semantic Analysis, Shell Unwrap, Sandbox
Command Input
│
▼
┌───────────────────────────────────┐
│ 1. Shell Unwrapper │ ← Recursively extract nested commands
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 2. Semantic Parser │ ← Parse flags, understand combinations
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 3. Stack Allowlist │ ← Python project? Block npm/yarn
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 4. Policy Check (SafetyGuard) │ ← Enforce agent-specific policies
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 5. Sandbox Execute │ ← Path restrictions, env sanitization
└───────────────────────────────────┘
Sources: Consensus, Handoff, Proactive Spawning, Background Delegation, Patrol
┌─────────────────────────────────────────────────────────────┐
│ Agent Lifecycle Manager │
├─────────────────────────────────────────────────────────────┤
│ SPAWN LAYER │
│ ├── ProactiveSpawner (pattern-triggered activation) │
│ ├── BackgroundDelegator (cheap agents for preprocessing) │
│ └── PatrolAgents (self-healing monitors) │
├─────────────────────────────────────────────────────────────┤
│ COORDINATE LAYER │
│ ├── HandoffManager (session/agent context transfer) │
│ ├── ConvoyTracker (work bundling across agents) │
│ └── ConsensusVoting (multi-agent decisions) │
├─────────────────────────────────────────────────────────────┤
│ COMMUNICATE LAYER │
│ ├── KnowledgeHub (pub/sub discoveries) │
│ └── MailSystem (inter-agent messaging) │
└─────────────────────────────────────────────────────────────┘
Sources: Per-Project, External Rules, Manifest Registry, Dual-Scope
Priority (lowest to highest):
1. Built-in Defaults
└── Hardcoded fail-safes (always active)
2. User Global: ~/.blackice/config.yaml
└── Personal preferences, API keys
3. Project Config: .blackice/config.yaml
└── Project-specific settings, models
4. Project Rules: AGENTS.md
└── Coding standards, review rules
5. Agent Manifest: .blackice/agents.yaml
└── Agent definitions, capabilities
Sources: LLMRouter, Role-Based, Provider Registry, Cross-Model Attack
class EnhancedLLMRouter:
"""Unified model routing with all strategies."""
def __init__(self):
self.registry = ProviderRegistry() # Self-registering providers
self.role_map = RoleModelMap() # Role → preferred model
self.capability_map = CapabilityMap() # Task type → requirements
def select(self, task: Task, strategy: str = "auto") -> list[str]:
if strategy == "role":
return [self.role_map.get(task.agent_role)]
elif strategy == "capability":
return [self.capability_map.match(task)]
elif strategy == "parallel":
return self._select_diverse_models(task, n=3)
else: # auto
return [self._smart_select(task)]class QPhase(Enum):
Q0_INIT = "init" # Define problem
Q1_HYPOTHESIZE = "hypothesize" # Generate alternatives
Q2_SUPPORT = "support" # Gather evidence
Q3_CHALLENGE = "challenge" # Find counter-evidence
Q4_AUDIT = "audit" # Check biases
Q5_DECIDE = "decide" # Make decision
@dataclass
class QCycleState:
phase: QPhase
problem: str
hypotheses: list[dict] # {id, description, confidence}
evidence: list[dict] # {id, hypothesis_id, type, content, weight}
challenges: list[dict] # {id, hypothesis_id, content}
audit_results: dict # {biases_found, confidence_adjustments}
decision: dict | None # {hypothesis_id, rationale, confidence}
class QCycleRunner:
async def run_cycle(self, problem: str) -> QCycleState:
state = QCycleState(phase=QPhase.Q0_INIT, problem=problem, ...)
state = await self._q1_hypothesize(state) # Generate 3-5 hypotheses
state = await self._q2_support(state) # Gather supporting evidence
state = await self._q3_challenge(state) # Find challenges
state = await self._q4_audit(state) # Check for biases
state = await self._q5_decide(state) # Make decision
return state@dataclass
class StackProfile:
name: str
indicators: list[str] # Files that indicate this stack
allowed_commands: list[str]
package_managers: list[str]
test_commands: list[str]
STACK_PROFILES = [
StackProfile(
name="python",
indicators=["pyproject.toml", "setup.py", "requirements.txt"],
allowed_commands=["python", "pip", "uv", "pytest", "ruff", "mypy"],
package_managers=["pip", "uv", "pipenv", "poetry"],
test_commands=["pytest", "python -m pytest"],
),
StackProfile(
name="node",
indicators=["package.json", "yarn.lock", "pnpm-lock.yaml"],
allowed_commands=["node", "npm", "npx", "yarn", "pnpm", "bun"],
package_managers=["npm", "yarn", "pnpm", "bun"],
test_commands=["npm test", "yarn test", "jest", "vitest"],
),
# ... rust, go, etc.
]
class DynamicAllowlist:
def is_allowed(self, command: str) -> bool:
base_cmd = command.split()[0]
return base_cmd in self.allowedclass ProviderRegistry:
_providers: dict[str, Type[LLMProvider]] = {}
@classmethod
def register(cls, name: str):
def decorator(provider_class):
cls._providers[name] = provider_class
return provider_class
return decorator
@classmethod
def create(cls, name: str, **config) -> LLMProvider:
return cls._providers[name](**config)
@ProviderRegistry.register("claude")
class ClaudeProvider:
async def generate(self, prompt: str, **kwargs) -> str: ...
async def get_quota(self) -> ProviderQuota: ...
@ProviderRegistry.register("ollama")
class OllamaProvider:
async def generate(self, prompt: str, **kwargs) -> str: ...class QuotaStatus(Enum):
HEALTHY = "healthy" # >50%
WARNING = "warning" # 20-50%
CRITICAL = "critical" # <20%
DEPLETED = "depleted" # 0%
@dataclass
class ProviderQuota:
provider: str
used: int
limit: int
unit: str # "tokens", "requests", "minutes"
reset_at: datetime | None
@property
def remaining(self) -> int:
return max(0, self.limit - self.used)
@property
def status(self) -> QuotaStatus:
pct = (self.remaining / self.limit) * 100
if pct == 0: return QuotaStatus.DEPLETED
if pct < 20: return QuotaStatus.CRITICAL
if pct < 50: return QuotaStatus.WARNING
return QuotaStatus.HEALTHY# .git/hooks/pre-commit
#!/usr/bin/env python3
"""Pre-commit hook for BLACKICE code review."""
def get_staged_files() -> list[Path]:
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True
)
return [Path(f) for f in result.stdout.strip().split("
") if f]
def main():
files = get_staged_files()
patterns = ["*.py", "*.ts", "*.js"]
reviewable = [f for f in files if should_review(f, patterns)]
if not reviewable:
sys.exit(0)
passed, message = run_review(reviewable)
if not passed:
print(f"❌ Review failed:
{message}")
sys.exit(1)
print("✅ Review passed")
sys.exit(0)| Service | Port | Purpose |
|---|---|---|
| Ollama | 11434 | Local LLM inference (3090 GPU) |
| Letta | 8283 | Stateful AI agents with persistent memory |
| PostgreSQL | 5432 | Database for Letta |
| LiteLLM | 4000 | Unified LLM gateway |
| LLMRouter | 4001 | Intelligent model routing |
| Claude Proxy | 42069 | Claude API proxy (192.168.1.143) |
services:
ollama:
image: ollama/ollama:latest
ports: ["11434:11434"]
volumes:
- ollama_data:/root/.ollama
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]
letta:
image: letta/letta:latest
ports: ["8283:8283"]
environment:
- LETTA_PG_URI=postgresql://letta:letta@postgres:5432/letta
depends_on: [postgres]
postgres:
image: postgres:16
ports: ["5432:5432"]
environment:
- POSTGRES_USER=letta
- POSTGRES_PASSWORD=letta
- POSTGRES_DB=letta# Check all services
blackice doctor
# Expected output:
# ✅ Ollama: http://localhost:11434 (running)
# ✅ Letta: http://localhost:8283 (running)
# ✅ PostgreSQL: localhost:5432 (running)
# ✅ Beads DB: ~/.beads/ralph.db (exists)
# ✅ Worktree Pool: /tmp/ralph-worktrees (clean)# Clone repository
git clone https://github.com/yourorg/blackice.git
cd blackice
# Install Python dependencies
uv pip install -e ".[dev]"
# Start infrastructure
docker compose up -d# Create project configuration
blackice init
# Verify setup
blackice doctor# Simple task
blackice run "Add error handling to api.py"
# With specific model
blackice run --model claude-sonnet-4 "Refactor authentication module"
# Parallel execution (DAG)
blackice run --dag workflow.yaml# Resume from crash
blackice recover
# View dead letter queue
blackice dlq list
# Retry failed tasks
blackice dlq retry --all- Gas Town - Convoys, GUPP, Patrol Agents
- BLACKICE Complete - Core architecture
- Superset - Per-project config
- MassGen - Cross-model attack
- ACFS - Manifest registry
- Oh-My-OpenCode - Role-based routing
- Ralph Orchestrator - Completion markers
- Wayfound - Letter grades
- Plannotator - Structured feedback
- Petit - Concurrency limits
- Planning-with-Files - 3-file pattern
- Acontext - SOP generation
- Claude-Workflow-v2 - Proactive spawning
- Claude-Code-Safety-Net - Semantic analysis
- Continuous-Claude-v2 - Continuity ledger
- Auto-Claude - Dynamic allowlist
- Guardian-Angel - Git hooks
- Quint-Code - Q-Cycle reasoning
- ClaudeBar - Quota monitoring
-
blackice doctorpasses on fresh install - Per-project config loads correctly
- Completion markers detected in agent output
- Status notifications working
- Command safety pipeline blocks dangerous commands
- Git pre-commit hooks run reviews
- Letter grades assigned to all task outputs
- CI strict mode fails on ambiguous results
- Q-Cycle produces structured decisions
- Handoffs transfer context between sessions
- SOPs generated from 3+ similar successes
- Quota monitoring alerts at thresholds
- OpenAI API wrapper serves requests
- Convoys track bundled work delivery
- Patrol agents recover stuck tasks
- Cross-model attack improves solution quality
Generated by BLACKICE Context Drop Generator v1.0
Original gist: c20aa4f397cade28d885902d6b58aef7
BLACKICE Ultimate Features Roadmap - Consolidated from 19 Project Analyses
Consolidated from 19 gists analyzing Gas Town, Superset, MassGen, ACFS, Oh-My-OpenCode, Ralph Orchestrator, Wayfound, Plannotator, Petit, Planning-with-Files, Acontext, Claude-Workflow-v2, Claude-Code-Safety-Net, Continuous-Claude-v2, Auto-Claude, Gentleman-Guardian-Angel, Quint-Code, and ClaudeBar.
Total Features Identified: 72 Conflicts Resolved: 7 major areas Phases: 4 (Foundation → Safety → Intelligence → Polish) Estimated Timeline: 8-12 weeks for full implementation
Sources in conflict:
- Beads Event Store (existing) - append-only SQLite events
- Continuity Ledger (Continuous-Claude) - explicit state snapshots
- 3-File State Pattern (Planning-with-Files) - plan/notes/output
- Scratchpad Persistence (Ralph Orchestrator) - markdown notes
Resolution: Layered State System
┌─────────────────────────────────────────────────┐
│ L3: Continuity Ledger (session snapshots) │ ← NEW (view over Beads)
├─────────────────────────────────────────────────┤
│ L2: Task Workspace (3-file pattern per task) │ ← NEW
├─────────────────────────────────────────────────┤
│ L1: Agent Scratchpad (per-agent notes) │ ← NEW
├─────────────────────────────────────────────────┤
│ L0: Beads Event Store (immutable events) │ ← KEEP (foundation)
└─────────────────────────────────────────────────┘
Sources in conflict:
- Binary pass/fail (existing Reflexion)
- Letter Grades A-F (Wayfound)
- Confidence Scores 0-1 (Quint-Code)
Resolution: Unified Scoring System
@dataclass
class QualityScore:
raw: float # 0-100 internal score
letter: str # A/B/C/D/F display grade
confidence: float # 0-1 decision confidence
breakdown: dict # Per-dimension scores
# Conversions:
# A = 90-100 (excellent)
# B = 80-89 (good)
# C = 70-79 (acceptable)
# D = 60-69 (needs work)
# F = 0-59 (failed)
# Confidence = raw / 100Sources in conflict:
- Beads events (existing)
- Letta semantic memory (existing)
- Memory Persistence (Auto-Claude) - insights
- SOP Generation (Acontext) - procedures
- Evidence Decay (Quint-Code) - aging
Resolution: 4-Layer Memory Architecture
┌────────────────────────────────────────────────────┐
│ L3: SOP Store │
│ Generated procedures from success patterns │
├────────────────────────────────────────────────────┤
│ L2: Insights DB (SQLite) │
│ CodebaseInsight records with decay timestamps │
├────────────────────────────────────────────────────┤
│ L1: Letta Semantic Memory │
│ Embeddings for cross-session learning │
├────────────────────────────────────────────────────┤
│ L0: Beads Event Store │
│ Immutable append-only event log │
└────────────────────────────────────────────────────┘
Sources in conflict:
- SafetyGuard (existing) - policy enforcement
- Dynamic Command Allowlisting (Auto-Claude) - stack-aware
- Semantic Command Analysis (Safety-Net) - flag parsing
- Shell Wrapper Detection (Safety-Net) - recursive unwrap
- Three-Layer Sandbox (Auto-Claude) - defense in depth
Resolution: 5-Stage Safety Pipeline
Command Input
│
▼
┌───────────────────────────────────┐
│ 1. Shell Unwrapper │ ← Recursively extract nested commands
│ bash -c "..." → actual command │
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 2. Semantic Parser │ ← Parse flags, understand combinations
│ git checkout -b vs checkout -- │
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 3. Stack Allowlist │ ← Python project? Block npm/yarn
│ Dynamic per-project filtering │
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 4. Policy Check (SafetyGuard) │ ← Enforce agent-specific policies
│ Loop detection, budget check │
└───────────────┬───────────────────┘
▼
┌───────────────────────────────────┐
│ 5. Sandbox Execute │ ← Path restrictions, env sanitization
│ Three-layer isolation │
└───────────────────────────────────┘
Sources in conflict:
- Consensus voting (existing)
- Handoff System (Continuous-Claude)
- Proactive Spawning (Claude-Workflow)
- Background Delegation (Oh-My-OpenCode)
- Patrol Agents (Gas Town)
Resolution: Unified Agent Lifecycle
┌─────────────────────────────────────────────────────────────┐
│ Agent Lifecycle Manager │
├─────────────────────────────────────────────────────────────┤
│ SPAWN LAYER │
│ ├── ProactiveSpawner (pattern-triggered activation) │
│ ├── BackgroundDelegator (cheap agents for preprocessing) │
│ └── PatrolAgents (self-healing monitors) │
├─────────────────────────────────────────────────────────────┤
│ COORDINATE LAYER │
│ ├── HandoffManager (session/agent context transfer) │
│ ├── ConvoyTracker (work bundling across agents) │
│ └── ConsensusVoting (multi-agent decisions) │
├─────────────────────────────────────────────────────────────┤
│ COMMUNICATE LAYER │
│ ├── KnowledgeHub (pub/sub discoveries) │
│ └── MailSystem (inter-agent messaging) │
└─────────────────────────────────────────────────────────────┘
Sources in conflict:
- Per-Project Config (Superset) - .blackice/config.yaml
- External Rules File (Guardian-Angel) - AGENTS.md
- Manifest-Driven Registry (ACFS) - agents.yaml
- Dual-Scope Config (Safety-Net) - user + project
Resolution: 5-Level Configuration Cascade
Priority (lowest to highest):
1. Built-in Defaults
└── Hardcoded fail-safes (always active)
2. User Global: ~/.blackice/config.yaml
└── Personal preferences, API keys
3. Project Config: .blackice/config.yaml
└── Project-specific settings, models
4. Project Rules: AGENTS.md
└── Coding standards, review rules
5. Agent Manifest: .blackice/agents.yaml
└── Agent definitions, capabilities
Merge strategy: Deep merge, later overrides earlier
Sources in conflict:
- LLMRouter (existing) - capability selection
- Role-Based Assignment (Oh-My-OpenCode)
- Provider Registry (ClaudeBar) - self-registration
- Cross-Model Attack (MassGen) - parallel execution
Resolution: Enhanced LLMRouter
class EnhancedLLMRouter:
"""Unified model routing with all strategies."""
def __init__(self):
self.registry = ProviderRegistry() # Self-registering providers
self.role_map = RoleModelMap() # Role → preferred model
self.capability_map = CapabilityMap() # Task type → requirements
def select(self, task: Task, strategy: str = "auto") -> list[str]:
if strategy == "role":
return [self.role_map.get(task.agent_role)]
elif strategy == "capability":
return [self.capability_map.match(task)]
elif strategy == "parallel":
return self._select_diverse_models(task, n=3)
else: # auto
return [self._smart_select(task)]Theme: Core infrastructure and quick wins
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 1.1 | Provider Registry Pattern | ClaudeBar | Low | High |
| 1.2 | Completion Marker Detection | Ralph Orchestrator | Low | High |
| 1.3 | Security Masking in Logs | Ralph Orchestrator | Low | High |
| 1.4 | Fail-Safe Defaults | Safety-Net | Low | High |
| 1.5 | blackice doctor Health Command |
ACFS | Low | High |
| 1.6 | Status Notifications | Superset | Low | High |
| 1.7 | Per-Project Configuration | Superset | Low | High |
| 1.8 | Continuation Enforcement | Oh-My-OpenCode | Low | High |
| 1.9 | Conditional Execution Semantics | Petit | Low | High |
| 1.10 | Concurrency Limits | Petit | Low | High |
| 1.11 | Multi-Step Command Chains | Claude-Workflow | Low | High |
| 1.12 | Forced Attention Recovery | Planning-with-Files | Low | High |
Deliverable: Robust CLI with better defaults, project configuration, and basic safety
Theme: Defense in depth and quality gates
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 2.1 | Dynamic Command Allowlisting | Auto-Claude | Medium | High |
| 2.2 | Semantic Command Analysis | Safety-Net | Medium | High |
| 2.3 | Shell Wrapper Detection | Safety-Net | Low-Med | High |
| 2.4 | Git Hook Integration | Guardian-Angel | Low | High |
| 2.5 | Content-Addressable Caching | Guardian-Angel | Low | High |
| 2.6 | Self-Validating QA Loop | Auto-Claude | Medium | High |
| 2.7 | Letter Grade Evaluation | Wayfound | Medium | High |
| 2.8 | Confidence Scoring | Quint-Code | Medium | High |
| 2.9 | Pre-Execution Guidelines Query | Wayfound | Low-Med | High |
| 2.10 | Three-Layer Security Sandbox | Auto-Claude | Medium | Medium |
| 2.11 | Adaptive Permission Framework | Ralph Orchestrator | Medium | Medium |
| 2.12 | Strict Mode for CI | Guardian-Angel | Low | Medium |
Deliverable: Production-ready safety layer with quality-gated execution
Theme: Learning, memory, and structured reasoning
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 3.1 | Q-Cycle Structured Reasoning | Quint-Code | Med-High | High |
| 3.2 | Resource Quota Monitoring | ClaudeBar | Medium | High |
| 3.3 | Continuity Ledger | Continuous-Claude | Medium | High |
| 3.4 | Handoff System | Continuous-Claude | Medium | High |
| 3.5 | Role-Based Model Assignment | Oh-My-OpenCode | Low | High |
| 3.6 | Proactive Agent Spawning | Claude-Workflow | Medium | High |
| 3.7 | Background Task Extraction | Acontext | Medium | High |
| 3.8 | Structured Feedback Format | Plannotator | Medium | High |
| 3.9 | Memory Persistence Across Sessions | Auto-Claude | Medium | Medium |
| 3.10 | Artifact Index (SQLite+FTS5) | Continuous-Claude | Medium | Medium |
| 3.11 | SOP Generation from Success | Acontext | Medium | Medium |
| 3.12 | Decision Documents | Quint-Code | Medium | Medium |
| 3.13 | Common Pitfall Analysis | Wayfound | Medium | Medium |
| 3.14 | Cascading Verification | Claude-Workflow | Medium | Medium |
| 3.15 | Validation Funnel | Continuous-Claude | Med-High | Medium |
Deliverable: Self-improving system with persistent learning and structured decisions
Theme: Enterprise features and ecosystem
| # | Feature | Source | Effort | Impact |
|---|---|---|---|---|
| 4.1 | Convoys (Work Bundling) | Gas Town | Low | High |
| 4.2 | OpenAI-Compatible API Wrapper | MassGen | Low | High |
| 4.3 | Live Progress Visualization | MassGen | Low | High |
| 4.4 | Manifest-Driven Agent Registry | ACFS | Medium | High |
| 4.5 | GUPP (Propulsion Principle) | Gas Town | Medium | Medium |
| 4.6 | Patrol Agents (Self-Healing) | Gas Town | Medium | Medium |
| 4.7 | Cross-Model Attack Pattern | MassGen | Medium | Medium |
| 4.8 | Notification-Based Knowledge Sharing | MassGen | Low-Med | Medium |
| 4.9 | Background Agent Delegation | Oh-My-OpenCode | Medium | Medium |
| 4.10 | Cross-Job Dependencies | Petit | Medium | Medium |
| 4.11 | Async Human-in-the-Loop | Plannotator | Medium | Medium |
| 4.12 | Built-in Diff Viewer | Superset | Medium | Medium |
| 4.13 | 3-File State Pattern | Planning-with-Files | Low | Medium |
| 4.14 | Session Health Monitoring | Acontext | Medium | Medium |
| 4.15 | Protocol-Based DI | ClaudeBar | Medium | Medium |
Deliverable: Enterprise-ready platform with full ecosystem integration
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Proactive Agent Spawning | 3 | Medium | Claude-Workflow |
| Background Agent Delegation | 4 | Medium | Oh-My-OpenCode |
| Handoff System | 3 | Medium | Continuous-Claude |
| Patrol Agents | 4 | Medium | Gas Town |
| Convoys (Work Bundling) | 4 | Low | Gas Town |
| Cross-Job Dependencies | 4 | Medium | Petit |
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Dynamic Command Allowlisting | 2 | Medium | Auto-Claude |
| Semantic Command Analysis | 2 | Medium | Safety-Net |
| Shell Wrapper Detection | 2 | Low-Med | Safety-Net |
| Three-Layer Sandbox | 2 | Medium | Auto-Claude |
| Security Masking | 1 | Low | Ralph Orchestrator |
| Fail-Safe Defaults | 1 | Low | Safety-Net |
| Adaptive Permissions | 2 | Medium | Ralph Orchestrator |
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Letter Grade Evaluation | 2 | Medium | Wayfound |
| Confidence Scoring | 2 | Medium | Quint-Code |
| Self-Validating QA Loop | 2 | Medium | Auto-Claude |
| Cascading Verification | 3 | Medium | Claude-Workflow |
| Strict Mode for CI | 2 | Low | Guardian-Angel |
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Memory Persistence | 3 | Medium | Auto-Claude |
| Artifact Index (FTS5) | 3 | Medium | Continuous-Claude |
| SOP Generation | 3 | Medium | Acontext |
| Decision Documents | 3 | Medium | Quint-Code |
| Evidence Decay | Backlog | Medium | Quint-Code |
| Continuity Ledger | 3 | Medium | Continuous-Claude |
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Q-Cycle Structured Reasoning | 3 | Med-High | Quint-Code |
| Forced Attention Recovery | 1 | Low | Planning-with-Files |
| Pre-Execution Guidelines | 2 | Low-Med | Wayfound |
| Validation Funnel | 3 | Med-High | Continuous-Claude |
| Common Pitfall Analysis | 3 | Medium | Wayfound |
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Provider Registry | 1 | Low | ClaudeBar |
| Per-Project Config | 1 | Low | Superset |
| Manifest-Driven Registry | 4 | Medium | ACFS |
blackice doctor |
1 | Low | ACFS |
| Protocol-Based DI | 4 | Medium | ClaudeBar |
| Feature | Phase | Effort | Source |
|---|---|---|---|
| Git Hook Integration | 2 | Low | Guardian-Angel |
| Content-Addressable Cache | 2 | Low | Guardian-Angel |
| Status Notifications | 1 | Low | Superset |
| Live Progress Visualization | 4 | Low | MassGen |
| Multi-Step Command Chains | 1 | Low | Claude-Workflow |
| OpenAI-Compatible API | 4 | Low | MassGen |
| Built-in Diff Viewer | 4 | Medium | Superset |
graph TD
subgraph "Phase 1: Foundation"
P1[Provider Registry] --> P2[Role-Based Routing]
P3[Per-Project Config] --> P4[External Rules File]
P5[Completion Markers] --> P6[Continuation Enforcement]
end
subgraph "Phase 2: Safety"
P1 --> S1[Dynamic Allowlisting]
S2[Shell Unwrapper] --> S3[Semantic Analysis]
S3 --> S1
S1 --> S4[Safety Pipeline]
S5[QA Loop] --> S6[Letter Grades]
S6 --> S7[Confidence Scoring]
end
subgraph "Phase 3: Intelligence"
S7 --> I1[Q-Cycle Reasoning]
P4 --> I2[Pre-Execution Guidelines]
I3[Continuity Ledger] --> I4[Handoff System]
I5[Memory Persistence] --> I6[SOP Generation]
I1 --> I7[Decision Documents]
end
subgraph "Phase 4: Scale"
I4 --> E1[Convoys]
I6 --> E2[Patrol Agents]
P2 --> E3[Cross-Model Attack]
I5 --> E4[Knowledge Sharing]
end
| Rank | Feature | Phase | Effort | Source |
|---|---|---|---|---|
| 1 | Q-Cycle Structured Reasoning | 3 | Med-High | Quint-Code |
| 2 | Dynamic Command Allowlisting | 2 | Medium | Auto-Claude |
| 3 | Continuity Ledger + Handoff | 3 | Medium | Continuous-Claude |
| 4 | Self-Validating QA Loop | 2 | Medium | Auto-Claude |
| 5 | Letter Grade Evaluation | 2 | Medium | Wayfound |
| 6 | Provider Registry Pattern | 1 | Low | ClaudeBar |
| 7 | Git Hook Integration | 2 | Low | Guardian-Angel |
| 8 | Quota Monitoring | 3 | Medium | ClaudeBar |
| 9 | Proactive Agent Spawning | 3 | Medium | Claude-Workflow |
| 10 | Semantic Command Analysis | 2 | Medium | Safety-Net |
| 11 | Completion Marker Detection | 1 | Low | Ralph Orchestrator |
| 12 | Role-Based Model Assignment | 3 | Low | Oh-My-OpenCode |
| 13 | Per-Project Configuration | 1 | Low | Superset |
| 14 | Confidence Scoring | 2 | Medium | Quint-Code |
| 15 | Background Task Extraction | 3 | Medium | Acontext |
| 16 | Forced Attention Recovery | 1 | Low | Planning-with-Files |
| 17 | Content-Addressable Caching | 2 | Low | Guardian-Angel |
| 18 | SOP Generation | 3 | Medium | Acontext |
| 19 | OpenAI-Compatible API | 4 | Low | MassGen |
| 20 | Convoys (Work Bundling) | 4 | Low | Gas Town |
| Feature | Source | Reason |
|---|---|---|
| Desktop Electron UI | Superset | Cross-platform CLI is sufficient |
| Pure Bash Implementation | Guardian-Angel | Python provides better functionality |
| MCP Server Architecture | Quint-Code | BLACKICE has its own architecture |
| Braintrust Integration | Continuous-Claude | External dependency, Beads is sufficient |
| RepoPrompt Dependency | Continuous-Claude | Paid tool, open alternatives exist |
| AGPL License | Auto-Claude | Too restrictive, BLACKICE is MIT |
| MEOW Workflow DSL | Gas Town | High effort, DAGExecutor is sufficient |
| Visual Plan Editing UI | Plannotator | CLI-first approach preferred |
-
blackice doctorpasses on fresh install - Per-project config loads correctly
- Completion markers detected in agent output
- Status notifications working
- Command safety pipeline blocks dangerous commands
- Git pre-commit hooks run reviews
- Letter grades assigned to all task outputs
- CI strict mode fails on ambiguous results
- Q-Cycle produces structured decisions
- Handoffs transfer context between sessions
- SOPs generated from 3+ similar successes
- Quota monitoring alerts at thresholds
- OpenAI API wrapper serves requests
- Convoys track bundled work delivery
- Patrol agents recover stuck tasks
- Cross-model attack improves solution quality
All ideas sourced from these gists:
- Gas Town
- BLACKICE Complete
- Superset
- MassGen
- ACFS
- Oh-My-OpenCode
- Ralph Orchestrator
- Wayfound MCP Supervisor
- Plannotator
- Petit
- Planning-with-Files
- Acontext
- Claude-Workflow-v2
- Claude-Code-Safety-Net
- Continuous-Claude-v2
- Auto-Claude
- Gentleman-Guardian-Angel
- Quint-Code
- ClaudeBar
Original gist: 279ab5b2bc8c1fdb4606a41509ecd614
BLACKICE 2.0 Naming Schemes: 3 options for repo + 8 primitives (Obsidian Foundry / Operant / IRONCLAD)
Source: GPT-5.2-pro naming analysis Date: January 8, 2026
Two-layer strategy:
- Layer 1 (Brand/repo): Metaphorical is fine — what people remember
- Layer 2 (Primitives): Function-first — engineers live in these names
Keeps BLACKICE "black/glass" feel but shifts from "hazard" to "craft"
| Primitive | Name | Meaning |
|---|---|---|
| Main orchestration loop | TemperLoop | Repeated heating/cooling → stronger metal |
| Spec/validation layer | BlueprintGate | Specs are blueprints; validation is a gate |
| Receipt/audit chain | ImprintLedger | Each run leaves an imprint in append-only ledger |
| Multi-agent consensus | GuildQuorum | Guild = skilled workers; quorum = decision threshold |
| Recovery/continuation | Reforge | Recover, resume, rebuild |
| Safety guard pipeline | ShieldLine | Safety line on factory floor |
| Cost/budget tracker | FuelMeter | Fuel = tokens/time/$; meter = live accounting |
| Memory/learning layer | AlloyMemory | Learning combines experiences into stronger alloys |
Best for: Product identity + "software factory" feel
"Operant" = learning by doing (trial → feedback → adaptation) + operating
| Primitive | Name | Meaning |
|---|---|---|
| Main orchestration loop | Supervisor | Owns lifecycle: schedule → execute → evaluate → retry |
| Spec/validation layer | ContractEngine | Vision → contracts (specs), validates, produces DAG |
| Receipt/audit chain | AttestationChain | Cryptographic provenance attestations |
| Multi-agent consensus | Quorum | Standard term for consensus |
| Recovery/continuation | ContinuityManager | Checkpoints, resumption, dead letters, rollbacks |
| Safety guard pipeline | PolicyGateway | All commands pass through policy + sandbox gates |
| Cost/budget tracker | CostMeter | Standard cloud billing metaphor |
| Memory/learning layer | LearningStore | SOPs, embeddings, insights, run summaries |
Best for: Enterprise platform clarity, onboarding, maintainability
Already means "guaranteed/reliable" in business language
| Primitive | Name | Backronym |
|---|---|---|
| Main orchestration loop | SPIRAL | Self-improving Process for Iteration, Reflection, And Learning |
| Spec/validation layer | CHARTER | Canonical Handoff And Requirements Traceability for Execution & Review |
| Receipt/audit chain | SEAL | Signed Execution Attestation Ledger |
| Multi-agent consensus | QUORUM | Quality-Weighted Unified Resolution Of Multiple agents |
| Recovery/continuation | RESUME | Recovery & Execution State for Unfinished Missions Engine |
| Safety guard pipeline | AEGIS | Allowlist-Enforced Guardrails & Isolation Stack |
| Cost/budget tracker | METER | Monetary & Token Expenditure Recorder |
| Memory/learning layer | PRISM | Persistent Reasoning & Insight Store for Mastery |
Best for: Brand cohesion, enterprise assurance language, compliance contexts
Hybrid approach:
Use IRONCLAD (brand/repo) + Scheme 2 internals (Supervisor, ContractEngine, PolicyGateway, etc.)
Gives you marketing strength and engineering clarity.
| Aspect | Obsidian Foundry | Operant | IRONCLAD |
|---|---|---|---|
| Vibe | Craft/Industrial | Platform/Technical | Enterprise/Assurance |
| Memorability | High | Medium | High |
| Enterprise-safe | Medium | High | Very High |
| Metaphor risk | Medium | Low | Low |
| Brand strength | High | Medium | Very High |
| If you want... | Pick this |
|---|---|
| Product identity + "software factory" feel | Obsidian Foundry |
| Enterprise platform clarity (integrate, extend, audit) | Operant |
| Brandable umbrella that sells "guarantees" | IRONCLAD |
| Best of both worlds | IRONCLAD repo + Operant internals |
Naming schemes by GPT-5.2-pro via Oracle, January 8, 2026