Consolidated from 19 tool/project analyses (Jan 7-8, 2026). Each section contains ideas worth adopting for BLACKICE from a different open-source tool.
<!-- Source Gist 1 of 19: 2b8159ee806769c1358481bc20b2c70b -->
Agentic Coding Flywheel Setup (ACFS) Ideas for BLACKICE
Ideas from ACFS that could improve BLACKICE.
ACFS transforms a fresh Ubuntu VPS into a fully-configured AI development environment in ~30 minutes via a single command.
| Aspect | ACFS | BLACKICE |
|---|---|---|
| Focus | Bootstrap AI dev environment | Run autonomous coding tasks |
| Platform | Ubuntu VPS | Any (Python) |
| Pattern | Manifest → Generate → Install | Ralph Loop + Consensus |
| State | ~/.acfs/state.json |
Beads event store |
| Verification | acfs doctor |
No unified health check |
- Manifest-Driven Generation - YAML defines all tools, TypeScript generates installers
- Idempotent Installation - Safe re-runs, resume from interruption
- Security Verification - SHA256 checksums for all upstream scripts
- Doctor Health Checks - Single command verifies entire stack
- Modular Categories - 11 installer categories, independently testable
What it is: Single YAML file defines all agents, their capabilities, models, and verification commands.
Current BLACKICE approach: Hardcoded adapters in Python code.
Why adopt: Change agent config without code changes. Generate docs, CLI help, and validation from one source.
Implementation sketch:
# blackice-manifest.yaml
version: "1.0"
agents:
claude-coder:
description: "Primary coding agent using Claude"
adapter: claude_proxy
model: claude-sonnet-4-20250514
capabilities:
- code_generation
- code_review
- refactoring
verification:
command: "curl -s http://localhost:42069/health"
expected: "ok"
config:
max_tokens: 8192
temperature: 0.7
ollama-fast:
description: "Fast local inference for iteration"
adapter: ollama
model: qwen2.5-coder:7b
capabilities:
- code_generation
- quick_iteration
verification:
command: "curl -s http://localhost:11434/api/tags | jq '.models | length'"
expected_min: 1
config:
max_tokens: 4096
temperature: 0.3
letta-memory:
description: "Long-term memory agent"
adapter: letta
capabilities:
- semantic_memory
- cross_session_learning
verification:
command: "curl -s http://localhost:8283/v1/health"
expected: "ok"
consensus:
strategies:
- majority
- supermajority
- unanimous
default: majority
quorum_size: 3
infrastructure:
beads_db: "~/.beads/beads.db"
worktree_base: "/tmp/ralph-worktrees"
log_level: INFO# Generate from manifest
from pathlib import Path
import yaml
def load_manifest(path: Path = Path("blackice-manifest.yaml")) -> dict:
return yaml.safe_load(path.read_text())
def generate_agent_registry(manifest: dict) -> str:
"""Generate Python code for agent registry."""
code = ["# AUTO-GENERATED from blackice-manifest.yaml", ""]
code.append("AGENTS = {")
for name, config in manifest["agents"].items():
code.append(f" '{name}': {{")
code.append(f" 'adapter': '{config['adapter']}',")
code.append(f" 'model': '{config['model']}',")
code.append(f" 'capabilities': {config['capabilities']},")
code.append(f" }},")
code.append("}")
return "\n".join(code)Effort: Medium - restructure config loading
Verdict: YES - Single source of truth is powerful.
What it is: Single command that verifies entire stack is operational.
Current BLACKICE approach: Must check each service manually.
Why adopt: One command to answer "is everything working?"
Implementation sketch:
import asyncio
import subprocess
from dataclasses import dataclass
from typing import Literal
@dataclass
class HealthCheck:
name: str
status: Literal["pass", "fail", "warn"]
message: str
latency_ms: float | None = None
class DoctorCommand:
"""Unified health check for BLACKICE stack."""
def __init__(self, manifest: dict):
self.manifest = manifest
async def check_all(self) -> list[HealthCheck]:
checks = []
# Check all agents from manifest
for name, config in self.manifest["agents"].items():
check = await self._check_agent(name, config)
checks.append(check)
# Check infrastructure
checks.append(await self._check_beads())
checks.append(await self._check_worktrees())
return checks
async def _check_agent(self, name: str, config: dict) -> HealthCheck:
verification = config.get("verification", {})
command = verification.get("command")
expected = verification.get("expected")
if not command:
return HealthCheck(name, "warn", "No verification command defined")
try:
start = asyncio.get_event_loop().time()
result = subprocess.run(
command, shell=True, capture_output=True, timeout=5
)
latency = (asyncio.get_event_loop().time() - start) * 1000
output = result.stdout.decode().strip()
if expected and output == expected:
return HealthCheck(name, "pass", f"Healthy ({latency:.0f}ms)", latency)
elif result.returncode == 0:
return HealthCheck(name, "pass", f"Running ({latency:.0f}ms)", latency)
else:
return HealthCheck(name, "fail", result.stderr.decode()[:100])
except Exception as e:
return HealthCheck(name, "fail", str(e)[:100])
async def _check_beads(self) -> HealthCheck:
db_path = Path(self.manifest["infrastructure"]["beads_db"]).expanduser()
if db_path.exists():
size_mb = db_path.stat().st_size / 1024 / 1024
return HealthCheck("beads", "pass", f"OK ({size_mb:.1f} MB)")
return HealthCheck("beads", "fail", f"Database not found: {db_path}")
# CLI usage
# $ blackice doctor
# ┌─────────────────────────────────────────────────────┐
# │ BLACKICE Health Check │
# ├──────────────┬────────┬─────────────────────────────┤
# │ Component │ Status │ Details │
# ├──────────────┼────────┼─────────────────────────────┤
# │ claude-coder │ ✓ PASS │ Healthy (45ms) │
# │ ollama-fast │ ✓ PASS │ Running (12ms) │
# │ letta-memory │ ✓ PASS │ Healthy (23ms) │
# │ beads │ ✓ PASS │ OK (156.2 MB) │
# │ worktrees │ ✓ PASS │ 3 active, 12 available │
# └──────────────┴────────┴─────────────────────────────┘Effort: Low - straightforward implementation
Verdict: YES - Essential for operations.
What it is: Setup commands that are safe to re-run. Interrupted runs resume.
Current BLACKICE approach: Manual setup, no state tracking.
Why adopt: Reliable deployment. Don't break things on re-run.
Implementation sketch:
@dataclass
class SetupState:
completed_steps: list[str]
last_step: str | None
started_at: datetime
completed_at: datetime | None
class IdempotentSetup:
"""Setup that tracks progress and resumes safely."""
STATE_FILE = Path("~/.blackice/setup-state.json").expanduser()
def __init__(self):
self.state = self._load_state()
def _load_state(self) -> SetupState:
if self.STATE_FILE.exists():
data = json.loads(self.STATE_FILE.read_text())
return SetupState(**data)
return SetupState([], None, datetime.now(), None)
def _save_state(self):
self.STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
self.STATE_FILE.write_text(json.dumps(asdict(self.state)))
async def run_step(self, step_id: str, action: Callable):
"""Run step only if not already completed."""
if step_id in self.state.completed_steps:
print(f"⏭️ Skipping {step_id} (already done)")
return
print(f"▶️ Running {step_id}...")
self.state.last_step = step_id
self._save_state()
try:
await action()
self.state.completed_steps.append(step_id)
self._save_state()
print(f"✓ Completed {step_id}")
except Exception as e:
print(f"✗ Failed {step_id}: {e}")
raise
# Usage
setup = IdempotentSetup()
await setup.run_step("install_ollama", install_ollama)
await setup.run_step("pull_models", pull_models)
await setup.run_step("init_beads", init_beads)
await setup.run_step("create_worktrees", create_worktrees)Effort: Low - simple state file
Verdict: YES - Professional deployment experience.
What it is: Verify checksums of any downloaded scripts/models before execution.
Current BLACKICE approach: Trust upstream sources.
Why adopt: Defense in depth. Catch supply chain attacks.
Implementation sketch:
# checksums.yaml
resources:
ollama-install:
url: "https://ollama.com/install.sh"
sha256: "abc123..."
litellm-config:
url: "https://raw.githubusercontent.com/.../litellm.yaml"
sha256: "def456..."import hashlib
import httpx
class VerifiedDownloader:
def __init__(self, checksums_file: Path):
self.checksums = yaml.safe_load(checksums_file.read_text())
async def download(self, resource_id: str) -> bytes:
resource = self.checksums["resources"][resource_id]
url = resource["url"]
expected_sha = resource["sha256"]
async with httpx.AsyncClient() as client:
response = await client.get(url)
content = response.content
actual_sha = hashlib.sha256(content).hexdigest()
if actual_sha != expected_sha:
raise SecurityError(
f"Checksum mismatch for {resource_id}!\n"
f"Expected: {expected_sha}\n"
f"Got: {actual_sha}\n"
f"Possible supply chain attack!"
)
return contentEffort: Low
Verdict: YES - Security best practice.
What it is: Generate CLI handlers, documentation, and boilerplate from manifest.
Current BLACKICE approach: Hand-written CLI.
Why adopt: Consistency. Change manifest → CLI updates automatically.
Implementation sketch:
def generate_cli_commands(manifest: dict) -> str:
"""Generate Click CLI from manifest."""
code = [
"# AUTO-GENERATED - do not edit",
"import click",
"",
"@click.group()",
"def cli():",
' """BLACKICE - Autonomous Coding System"""',
" pass",
"",
]
# Generate command for each agent
for name, config in manifest["agents"].items():
code.append(f"@cli.command()")
code.append(f'@click.option("--prompt", required=True)')
code.append(f"def {name.replace('-', '_')}(prompt: str):")
code.append(f' """Run task using {config["description"]}"""')
code.append(f' run_agent("{name}", prompt)')
code.append("")
return "\n".join(code)
# Generate: python -m blackice.codegen
# Output: integrations/ralph/cli_generated.pyEffort: Medium - requires build step
Verdict: MAYBE - Nice but not essential.
Why skip: BLACKICE should remain cross-platform. Docker handles platform abstraction.
Why skip: Enterprise users prefer CLI/IaC. Wizard is good for beginners but BLACKICE targets developers.
Why skip: BLACKICE is focused. Don't bundle unrelated dev tools.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
blackice doctor |
YES | Low | High |
| Manifest-Driven Registry | YES | Medium | High |
| Idempotent Setup | YES | Low | Medium |
| SHA256 Verification | YES | Low | Medium |
| Code Generation | MAYBE | Medium | Low |
<!-- Source Gist 2 of 19: de7863549ca0366c5fdaa6683f07d595 -->
MassGen Ideas Worth Adopting for BLACKICE
Ideas from MassGen that could improve BLACKICE.
MassGen is a terminal-based multi-agent scaling system that orchestrates frontier models to collaborate like a "parallel study group."
| Aspect | MassGen | BLACKICE |
|---|---|---|
| Focus | Parallel reasoning convergence | Iterate-until-success with consensus |
| Pattern | Study group (observe & refine) | Ralph Loop + voting |
| Platform | Terminal (Python) | Python CLI |
| Model Support | 15+ providers | Claude, Ollama, Letta |
| Coordination | Notification hub | Message broker + consensus |
| State | JSON status files | Beads event store |
- Cross-Model Synergy - Different models attack same problem simultaneously
- Intelligence Sharing - Agents broadcast observations in real-time
- Convergence Detection - Natural consensus without forced agreement
- Adaptive Restart - Agents pivot when receiving novel insights
- OpenAI-Compatible API - Expose orchestration as
/v1/chat/completions
What it is: Multiple models solve the same problem in parallel, each with different strategies.
Current BLACKICE approach: Sequential model selection via LLMRouter.
Why adopt: Different models have different strengths. Claude is good at architecture, Ollama/Qwen is fast for iteration, GPT-4 catches edge cases.
Implementation sketch:
@dataclass
class ParallelAttack:
task: Task
strategies: list[AttackStrategy]
@dataclass
class AttackStrategy:
model: str
approach: Literal["tdd", "doc_first", "refactor", "spike"]
prompt_modifier: str
class CrossModelAttacker:
"""Attack a problem with multiple models simultaneously."""
async def attack(self, task: Task) -> list[Solution]:
strategies = [
AttackStrategy("claude-sonnet-4-20250514", "tdd",
"Write tests first, then implement."),
AttackStrategy("ollama/qwen2.5-coder", "spike",
"Quick prototype to explore solution space."),
AttackStrategy("gpt-4o", "doc_first",
"Document the interface, then implement."),
]
# Launch all attacks in parallel
tasks = [
self._execute_strategy(task, strategy)
for strategy in strategies
]
solutions = await asyncio.gather(*tasks)
# Use existing consensus to pick best
return await self.consensus.vote(solutions)Effort: Medium - leverages existing parallel infrastructure
Verdict: YES - Natural extension of current multi-model support.
What it is: Agents publish findings to a shared hub. Other agents can subscribe and react.
Current BLACKICE approach: Direct message broker (request/reply).
Why adopt: Organic knowledge distribution. Agent A finds a bug, Agent B immediately knows.
Implementation sketch:
@dataclass
class Notification:
agent_id: str
notification_type: Literal["finding", "blocker", "insight", "partial_solution"]
content: str
timestamp: datetime
relevance_tags: list[str]
class NotificationHub:
"""Pub/sub for agent discoveries."""
def __init__(self, beads: BeadsClient):
self.beads = beads
self.subscribers: dict[str, list[Callable]] = {}
async def publish(self, notification: Notification):
# Persist to Beads for replay
await self.beads.append_event(
"notification_published",
notification.__dict__
)
# Notify subscribers
for tag in notification.relevance_tags:
for callback in self.subscribers.get(tag, []):
await callback(notification)
async def subscribe(self, agent_id: str, tags: list[str], callback: Callable):
for tag in tags:
self.subscribers.setdefault(tag, []).append(callback)
# Agent usage
async def on_finding(notification: Notification):
if notification.notification_type == "blocker":
# Pivot strategy based on peer's blocker
await self.pivot_strategy(notification.content)
await hub.subscribe("agent-1", ["python", "testing"], on_finding)Effort: Low-Medium - extends existing message broker
Verdict: YES - More natural than explicit message passing.
What it is: System detects when agents naturally reach similar conclusions without forced voting.
Current BLACKICE approach: Explicit consensus voting (majority, supermajority, etc.).
Why adopt: Less overhead when agents already agree. Save voting for real disagreements.
Implementation sketch:
@dataclass
class ConvergenceState:
solutions: list[Solution]
similarity_matrix: dict[tuple[str, str], float]
converged: bool
convergence_score: float
class ConvergenceDetector:
"""Detect natural consensus before forcing vote."""
def __init__(self, threshold: float = 0.85):
self.threshold = threshold
self.embedding_model = "text-embedding-3-small"
async def check_convergence(self, solutions: list[Solution]) -> ConvergenceState:
# Embed all solutions
embeddings = await self._embed_solutions(solutions)
# Calculate pairwise similarity
similarity_matrix = {}
for i, sol_a in enumerate(solutions):
for j, sol_b in enumerate(solutions[i+1:], i+1):
similarity = cosine_similarity(embeddings[i], embeddings[j])
similarity_matrix[(sol_a.agent_id, sol_b.agent_id)] = similarity
# Check if all pairs above threshold
avg_similarity = sum(similarity_matrix.values()) / len(similarity_matrix)
converged = avg_similarity >= self.threshold
return ConvergenceState(
solutions=solutions,
similarity_matrix=similarity_matrix,
converged=converged,
convergence_score=avg_similarity
)
async def get_consensus(self, solutions: list[Solution]) -> Solution:
state = await self.check_convergence(solutions)
if state.converged:
# Natural consensus - pick any (or merge)
return await self._merge_similar(solutions)
else:
# Fall back to explicit voting
return await self.consensus_engine.vote(solutions)Effort: Medium - requires embedding infrastructure
Verdict: YES - More efficient than always voting.
What it is: Expose entire multi-agent system as standard /v1/chat/completions endpoint.
Current BLACKICE approach: CLI only (ralph run).
Why adopt: Any tool expecting OpenAI API can use BLACKICE. IDE plugins, scripts, other agents.
Implementation sketch:
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class ChatCompletionRequest(BaseModel):
model: str # Ignored - uses BLACKICE routing
messages: list[dict]
temperature: float = 0.7
class ChatCompletionResponse(BaseModel):
id: str
choices: list[dict]
usage: dict
@app.post("/v1/chat/completions")
async def chat_completion(request: ChatCompletionRequest) -> ChatCompletionResponse:
# Extract task from messages
task = extract_task(request.messages)
# Run through full EnterpriseFlywheel
result = await flywheel.execute_task(task)
# Format as OpenAI response
return ChatCompletionResponse(
id=result.task_id,
choices=[{
"message": {"role": "assistant", "content": result.output},
"finish_reason": "stop"
}],
usage={
"prompt_tokens": result.metrics.prompt_tokens,
"completion_tokens": result.metrics.completion_tokens,
"total_tokens": result.metrics.total_tokens
}
)
# Run with: uvicorn blackice.api:app --port 8080Effort: Low - wrapper around existing CLI
Verdict: YES - Unlocks ecosystem integration.
What it is: Real-time display of agent progress and decision-making.
Current BLACKICE approach: CLI output, logs.
Why adopt: See what's happening during long runs. Debug stuck agents.
Implementation sketch:
# Terminal UI with rich
from rich.live import Live
from rich.table import Table
from rich.console import Console
class LiveDashboard:
def __init__(self, flywheel: EnterpriseFlywheel):
self.flywheel = flywheel
self.console = Console()
def generate_table(self) -> Table:
table = Table(title="BLACKICE Agent Status")
table.add_column("Agent")
table.add_column("Model")
table.add_column("Status")
table.add_column("Iteration")
table.add_column("Tokens")
for agent in self.flywheel.active_agents:
table.add_row(
agent.id,
agent.model,
agent.status,
str(agent.iteration),
f"{agent.tokens_used:,}"
)
return table
async def run(self, task: Task):
with Live(self.generate_table(), refresh_per_second=2) as live:
async for event in self.flywheel.execute_stream(task):
live.update(self.generate_table())Effort: Low - uses existing metrics
Verdict: YES - Essential for debugging.
Why skip: MassGen's strength is 15+ providers, but BLACKICE intentionally limits to Claude + Ollama + Letta for simplicity and control. Adding more providers adds complexity without clear benefit.
Why skip: MassGen uses JSON files. BLACKICE's Beads event store is more robust for crash recovery and audit trails.
Why skip: Already have CLI. A web dashboard (see Superset gist) would be more valuable than another terminal UI.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| OpenAI-Compatible API | YES | Low | High |
| Live Progress Visualization | YES | Low | High |
| Cross-Model Attack | YES | Medium | Medium |
| Notification Hub | YES | Low-Medium | Medium |
| Convergence Detection | YES | Medium | Low |
<!-- Source Gist 3 of 19: d33731cdbc2f13b7eb602cdfc6761e1d -->
Superset Ideas Worth Adopting for BLACKICE
Ideas from Superset that could improve BLACKICE.
Superset is a desktop terminal application for managing 10+ parallel CLI coding agents.
| Aspect | Superset | BLACKICE |
|---|---|---|
| Focus | Terminal UI for parallel agents | Backend orchestration |
| Platform | Electron desktop app (macOS) | Python CLI |
| Workspace Isolation | Git worktrees | Git worktrees (same!) |
| Agent Support | Any CLI agent | Claude, Ollama, Letta, Codex |
| Tech Stack | Electron, React, Bun, tRPC | Python, SQLite |
- Parallel Agent Management - Run 10+ CLI agents simultaneously
- Git Worktree Isolation - Each task gets isolated workspace
- Built-in Diff Viewer - Review agent changes visually
- Status Monitoring - Notifications when agents complete
- Config-Driven Setup -
.superset/config.jsonfor automation
What it is: Visual desktop app for managing agents instead of CLI-only.
Current BLACKICE approach: CLI only (ralph run, ralph status).
Why adopt: Visual management of 10+ agents is easier than CLI cycling.
Implementation approach:
Option A: Build Electron app (like Superset)
Option B: Build web dashboard (simpler, cross-platform)
Option C: Adopt Superset directly as BLACKICE frontend
Effort: High (new app) or Low (integrate with Superset)
Verdict: MAYBE - Consider integrating with Superset rather than building from scratch.
What it is: Visual diff editor for reviewing agent changes before merge.
Current BLACKICE approach: Relies on external tools (git diff, IDE).
Why adopt: Faster review loop → faster iteration.
Implementation sketch:
# Add to CLI
ralph diff <task_id> # Show diff for task
ralph diff --interactive # Interactive diff review
ralph diff --accept <task_id> # Accept changes
ralph diff --reject <task_id> # Reject changes
# Or web UI
GET /api/tasks/<id>/diff # Return diff JSON
POST /api/tasks/<id>/accept # Accept changesEffort: Medium
Verdict: YES - Improves review workflow.
What it is: Desktop notifications when agents complete tasks.
Current BLACKICE approach: Must poll ralph status manually.
Why adopt: Don't miss completed work while multitasking.
Implementation sketch:
# macOS
import subprocess
def notify(title: str, message: str):
subprocess.run([
"osascript", "-e",
f'display notification "{message}" with title "{title}"'
])
# Cross-platform with plyer
from plyer import notification
notification.notify(title="BLACKICE", message="Task completed!")
# Or websocket for web UI
async def broadcast_completion(task_id: str):
await websocket.send_json({"event": "task_complete", "task_id": task_id})Effort: Low
Verdict: YES - Easy win.
What it is: .superset/config.json automates environment setup per project.
Current BLACKICE approach: Manual config via ~/.ralph/config.yaml.
Why adopt: Project-specific configs for different codebases.
Implementation sketch:
# .blackice/config.yaml (per-project)
project:
name: "my-api"
default_model: "claude-3-5-sonnet"
worktree:
base_branch: "main"
prefix: "blackice-"
setup:
pre_task:
- "npm install"
- "docker compose up -d"
post_task:
- "npm test"Effort: Low
Verdict: YES - Per-project configs are useful.
Why skip: BLACKICE is Python-based. Building a full Electron app is overkill when:
- A web dashboard would work better
- tmux UI (like Gas Town) is simpler
- Could integrate with Superset instead of competing
Why skip: BLACKICE should remain cross-platform.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Status Notifications | YES | Low | High |
| Built-in Diff Viewer | YES | Medium | Medium |
| Per-Project Config | YES | Low | High |
| Desktop UI | MAYBE | High | Low |
Instead of building UI from scratch, consider:
- BLACKICE as backend → Superset as frontend
- Expose BLACKICE via tRPC or REST API
- Let Superset manage the visual layer
┌─────────────────────────────────────┐
│ Superset (UI) │
│ Electron + React + TailwindCSS │
└─────────────────┬───────────────────┘
│ tRPC / REST
▼
┌─────────────────────────────────────┐
│ BLACKICE (Backend) │
│ EnterpriseFlywheel + Consensus │
└─────────────────────────────────────┘
<!-- Source Gist 4 of 19: 17644e057a159c39b9d50c555cefd418 -->
Gas Town ideas worth adopting for BLACKICE
Ideas from Steve Yegge's Gas Town that could improve BLACKICE.
| System | BLACKICE | Gas Town |
|---|---|---|
| Core Pattern | Ralph Loop (iterate until success) | MEOW (molecular workflows) |
| Language | Python (53K lines) | Go (75K lines) |
| Strength | Consensus, observability, multi-LLM | Workflow DSL, visual UI, self-healing |
What it is: "If there is work on your hook, YOU MUST RUN IT."
Every agent has a hook - a persistent pointer to work they must execute on startup. This guarantees continuation across crashes and context window exhaustion.
Current BLACKICE approach: Beads event replay - reconstructs state from event history.
Why adopt: GUPP is simpler. Instead of replaying events, just check the hook and continue.
Implementation sketch:
@dataclass
class AgentHook:
agent_id: str
current_task_id: str | None
current_step: int
molecule_id: str | None # workflow chain
class EnterpriseFlywheel:
async def on_agent_start(self, agent_id: str):
hook = await self.beads.get_hook(agent_id)
if hook.current_task_id:
# GUPP: Must run hooked work
await self.continue_task(hook)Effort: Medium - adds hook table to Beads, modify agent startup
What it is: Workflow algebra with composable primitives:
- Beads → atomic work units
- Epics → beads with children
- Molecules → chained workflow steps
- Protomolecules → workflow templates
- Formulas → TOML source that compiles to molecules
- Wisps → ephemeral molecules (not persisted to git)
Current BLACKICE approach: DAG executor with hardcoded workflows.
Why adopt: Define workflows as data, not code. Compose, template, reuse.
Example Formula (TOML):
[formula]
name = "feature-implementation"
description = "Standard feature workflow"
[[steps]]
id = "design"
name = "Design the feature"
prompt = "Create a design document for: {feature_description}"
[[steps]]
id = "implement"
name = "Implement the feature"
depends_on = ["design"]
prompt = "Implement based on design: {design.output}"
[[steps]]
id = "test"
name = "Write tests"
depends_on = ["implement"]
prompt = "Write tests for: {implement.files_changed}"
[[steps]]
id = "review"
name = "Code review"
depends_on = ["test"]
prompt = "Review implementation against design"Implementation sketch:
@dataclass
class MoleculeStep:
id: str
name: str
prompt: str
depends_on: list[str]
status: Literal["pending", "running", "done", "failed"]
@dataclass
class Molecule:
id: str
formula_name: str
steps: list[MoleculeStep]
variables: dict[str, Any]
def next_step(self) -> MoleculeStep | None:
"""Return next runnable step based on dependencies."""
for step in self.steps:
if step.status == "pending":
deps_done = all(
self.get_step(d).status == "done"
for d in step.depends_on
)
if deps_done:
return step
return NoneEffort: High - new subsystem, but very powerful
What it is: Background agents that continuously monitor and fix issues:
- Witness → monitors workers, unsticks stuck agents
- Deacon → daemon that propagates "do your job" signals
- Dogs → helpers that handle maintenance tasks
Current BLACKICE approach: No self-healing. Manual intervention required.
Why adopt: System keeps running without human babysitting.
Implementation sketch:
class PatrolAgent:
"""Background agent that runs a patrol loop."""
async def patrol(self):
while True:
# Check system health
stuck_agents = await self.find_stuck_agents()
for agent in stuck_agents:
await self.nudge_agent(agent)
# Check merge queue
pending_merges = await self.check_merge_queue()
if pending_merges:
await self.process_merges(pending_merges)
# Exponential backoff if nothing to do
await self.sleep_with_backoff()
class WitnessAgent(PatrolAgent):
"""Monitors workers and helps them get unstuck."""
async def find_stuck_agents(self) -> list[Agent]:
agents = await self.beads.get_active_agents()
stuck = []
for agent in agents:
last_activity = await self.beads.get_last_activity(agent.id)
if self.is_stuck(last_activity):
stuck.append(agent)
return stuck
async def nudge_agent(self, agent: Agent):
"""Send GUPP nudge to stuck agent."""
await self.send_message(agent.id, "Do your job. Check your hook.")Effort: Medium - add patrol loop, stuck detection heuristics
What it is: A tracking unit that bundles multiple issues/tasks together for delivery.
Instead of tracking individual tasks, track the convoy - the logical unit of work being delivered.
Current BLACKICE approach: Track individual tasks. No bundling.
Why adopt: Better visibility into "what shipped" vs "what tasks ran."
Implementation sketch:
@dataclass
class Convoy:
id: str
name: str
description: str
task_ids: list[str]
status: Literal["active", "landed", "failed"]
started_at: datetime
landed_at: datetime | None
@property
def progress(self) -> float:
done = sum(1 for t in self.tasks if t.status == "done")
return done / len(self.tasks) if self.tasks else 0.0
class ConvoyTracker:
async def create_convoy(self, name: str, task_ids: list[str]) -> Convoy:
convoy = Convoy(
id=generate_id(),
name=name,
task_ids=task_ids,
status="active",
started_at=datetime.now(),
)
await self.beads.save_convoy(convoy)
return convoy
async def check_convoy(self, convoy_id: str) -> Convoy:
convoy = await self.beads.get_convoy(convoy_id)
tasks = [await self.beads.get_task(t) for t in convoy.task_ids]
if all(t.status == "done" for t in tasks):
convoy.status = "landed"
convoy.landed_at = datetime.now()
await self.beads.save_convoy(convoy)
return convoyEffort: Low - simple wrapper around existing task tracking
What it is: Visual management of 20-30 Claude Code instances in tmux.
Current BLACKICE approach: CLI only.
Why adopt: See all agents at once, switch between them, visual monitoring.
Implementation sketch:
# gt (gas town) style commands for BLACKICE
blackice tmux start # Start tmux session with agent panes
blackice tmux status # Show all agents in split view
blackice tmux attach <agent> # Attach to specific agent
blackice tmux broadcast <msg> # Send message to all agentsEffort: Low-Medium - tmux scripting, optional feature
Why skip: BLACKICE's consensus voting is more flexible. Gas Town's Mayor is a single point of decision-making. Consensus allows multiple agents to vote on solutions, catching more errors.
Why skip: BLACKICE's Beads event replay is more deterministic and auditable. NDI is "eventually correct" which is fine for vibe coding but not for enterprise use cases.
Why skip: BLACKICE's multi-LLM support (Claude, Ollama, Letta, Codex) is a strength. Don't regress to single-provider lock-in.
| Priority | Feature | Effort | Impact |
|---|---|---|---|
| 1 | Convoys | Low | High - better tracking |
| 2 | GUPP | Medium | High - simpler recovery |
| 3 | Patrol Agents | Medium | High - self-healing |
| 4 | MEOW | High | Very High - workflow DSL |
| 5 | tmux UI | Low | Medium - nice to have |
- Gas Town GitHub
- Welcome to Gas Town (Yegge's blog)
- Beads (Yegge's issue tracker)
- Ralph Pattern
- BLACKICE README
<!-- Source Gist 5 of 19: eff6b4d7204aa95d5b18476569c39682 -->
ClaudeBar Ideas for BLACKICE
Ideas from ClaudeBar for BLACKICE.
A macOS menu bar app that monitors AI coding assistant quota usage across multiple providers with clean architecture.
| Aspect | ClaudeBar | BLACKICE |
|---|---|---|
| Focus | Quota monitoring | Iterate-until-success |
| Platform | macOS (SwiftUI) | Python CLI |
| Providers | Claude, Codex, Gemini, Copilot, etc. | Claude, Ollama, Letta |
| Architecture | Protocol-based DI | Adapter pattern |
- Multi-Provider Monitoring - Track all AI tool quotas in one place
- Protocol-Based DI - Injectable, testable abstractions
- Repository Pattern - Clean data access layer
- Chicago School TDD - Test state changes, not method calls
- Threshold Alerts - Color-coded health indicators
What it is: Track and display usage across all providers.
Current BLACKICE approach: CostTracker tracks tokens but no dashboard.
Why adopt: Know when you're running low. Plan budget.
Implementation sketch:
from dataclasses import dataclass
from enum import Enum
class QuotaStatus(Enum):
HEALTHY = "healthy" # >50%
WARNING = "warning" # 20-50%
CRITICAL = "critical" # <20%
DEPLETED = "depleted" # 0%
@dataclass
class ProviderQuota:
provider: str
used: int
limit: int
unit: str # "tokens", "requests", "minutes"
reset_at: datetime | None
@property
def remaining(self) -> int:
return max(0, self.limit - self.used)
@property
def percentage(self) -> float:
if self.limit == 0:
return 0
return (self.remaining / self.limit) * 100
@property
def status(self) -> QuotaStatus:
pct = self.percentage
if pct == 0:
return QuotaStatus.DEPLETED
if pct < 20:
return QuotaStatus.CRITICAL
if pct < 50:
return QuotaStatus.WARNING
return QuotaStatus.HEALTHY
class QuotaMonitor:
"""Monitor quotas across all providers."""
def __init__(self, providers: list[ProviderProbe]):
self.providers = {p.name: p for p in providers}
self._quotas: dict[str, ProviderQuota] = {}
async def refresh_all(self):
"""Fetch current quotas from all providers."""
for name, provider in self.providers.items():
try:
quota = await provider.get_quota()
self._quotas[name] = quota
except Exception as e:
logger.warning(f"Failed to fetch quota for {name}: {e}")
def get_status(self) -> dict[str, ProviderQuota]:
"""Get current quota status."""
return self._quotas.copy()
def get_summary(self) -> str:
"""Get human-readable summary."""
lines = ["## Provider Quotas", ""]
for name, quota in sorted(self._quotas.items()):
icon = {
QuotaStatus.HEALTHY: "🟢",
QuotaStatus.WARNING: "🟡",
QuotaStatus.CRITICAL: "🔴",
QuotaStatus.DEPLETED: "⚫",
}[quota.status]
lines.append(f"{icon} {name}: {quota.remaining:,}/{quota.limit:,} {quota.unit} ({quota.percentage:.0f}%)")
return "\n".join(lines)
def can_use(self, provider: str, amount: int = 1) -> bool:
"""Check if provider has enough quota."""
quota = self._quotas.get(provider)
if not quota:
return True # Unknown = allow
return quota.remaining >= amount
# Provider probe interface
class ProviderProbe(Protocol):
name: str
async def get_quota(self) -> ProviderQuota:
"""Fetch current quota from provider."""
...
# Example: Claude probe
class ClaudeProbe:
name = "claude"
async def get_quota(self) -> ProviderQuota:
# Parse from Claude's usage endpoint
response = await self._fetch_usage()
return ProviderQuota(
provider="claude",
used=response["tokens_used"],
limit=response["tokens_limit"],
unit="tokens",
reset_at=datetime.fromisoformat(response["reset_at"])
)Effort: Medium
Verdict: YES - Essential for budget management.
What it is: Define interfaces as protocols, inject implementations.
Current BLACKICE approach: Direct class dependencies.
Why adopt: Testable. Swappable implementations.
Implementation sketch:
from typing import Protocol, runtime_checkable
@runtime_checkable
class TaskStorage(Protocol):
"""Protocol for task persistence."""
async def save(self, task: Task) -> None: ...
async def get(self, task_id: str) -> Task | None: ...
async def list(self, status: str = None) -> list[Task]: ...
@runtime_checkable
class LLMProvider(Protocol):
"""Protocol for LLM interactions."""
async def generate(self, prompt: str, **kwargs) -> str: ...
async def get_quota(self) -> ProviderQuota: ...
@runtime_checkable
class EventStore(Protocol):
"""Protocol for event persistence."""
async def append(self, event: Event) -> None: ...
async def get_events(self, entity_id: str) -> list[Event]: ...
# Implementations
class SQLiteTaskStorage:
"""SQLite implementation of TaskStorage."""
def __init__(self, db_path: Path):
self.db = sqlite3.connect(db_path)
async def save(self, task: Task) -> None:
# Implementation
pass
class InMemoryTaskStorage:
"""In-memory implementation for testing."""
def __init__(self):
self._tasks: dict[str, Task] = {}
async def save(self, task: Task) -> None:
self._tasks[task.id] = task
async def get(self, task_id: str) -> Task | None:
return self._tasks.get(task_id)
# Dependency injection container
@dataclass
class Dependencies:
task_storage: TaskStorage
llm_provider: LLMProvider
event_store: EventStore
def create_production_deps() -> Dependencies:
return Dependencies(
task_storage=SQLiteTaskStorage(Path("~/.blackice/tasks.db")),
llm_provider=ClaudeProvider(),
event_store=BeadsEventStore(Path("~/.beads/beads.db"))
)
def create_test_deps() -> Dependencies:
return Dependencies(
task_storage=InMemoryTaskStorage(),
llm_provider=MockLLMProvider(),
event_store=InMemoryEventStore()
)
# Usage in flywheel
class EnterpriseFlywheel:
def __init__(self, deps: Dependencies):
self.storage = deps.task_storage
self.llm = deps.llm_provider
self.events = deps.event_storeEffort: Medium
Verdict: YES - Better testability.
What it is: Test observable outcomes, not implementation details.
Current BLACKICE approach: Mix of state and mock-based tests.
Why adopt: Less brittle tests. Focus on behavior.
Implementation sketch:
import pytest
# BAD: Testing implementation details (London School)
class TestFlywheelBad:
def test_execute_calls_llm(self, mocker):
# Fragile: breaks if implementation changes
mock_llm = mocker.patch("blackice.llm.generate")
flywheel = Flywheel()
flywheel.execute(task)
mock_llm.assert_called_once() # ❌ Testing HOW, not WHAT
# GOOD: Testing observable outcomes (Chicago School)
class TestFlywheelGood:
async def test_execute_produces_result(self, deps):
# Robust: tests observable outcome
flywheel = Flywheel(deps)
task = Task(id="1", description="Write hello world")
result = await flywheel.execute(task)
# ✅ Testing WHAT happened, not HOW
assert result.status == "success"
assert "hello" in result.output.lower()
assert await deps.task_storage.get("1") is not None
async def test_execute_persists_events(self, deps):
flywheel = Flywheel(deps)
task = Task(id="1", description="Write hello world")
await flywheel.execute(task)
# ✅ Testing observable state change
events = await deps.event_store.get_events("1")
assert len(events) >= 2 # At least start and complete
assert events[0].type == "task_started"
assert events[-1].type in ("task_completed", "task_failed")
async def test_execute_respects_budget(self, deps):
deps.llm_provider.quota = ProviderQuota(
provider="test", used=990, limit=1000, unit="tokens"
)
flywheel = Flywheel(deps)
task = Task(id="1", description="Write something long")
result = await flywheel.execute(task)
# ✅ Testing observable behavior
assert result.status == "failed"
assert "budget" in result.error.lower()
# Test fixtures using dependency injection
@pytest.fixture
def deps():
return create_test_deps()
@pytest.fixture
def flywheel(deps):
return Flywheel(deps)Effort: Low (mindset change)
Verdict: YES - Better tests.
What it is: Providers self-register capabilities.
Current BLACKICE approach: Hardcoded provider list.
Why adopt: Easy to add new providers. Plugin-friendly.
Implementation sketch:
from typing import Type
class ProviderRegistry:
"""Registry for LLM providers."""
_providers: dict[str, Type[LLMProvider]] = {}
@classmethod
def register(cls, name: str):
"""Decorator to register a provider."""
def decorator(provider_class: Type[LLMProvider]):
cls._providers[name] = provider_class
return provider_class
return decorator
@classmethod
def get(cls, name: str) -> Type[LLMProvider] | None:
return cls._providers.get(name)
@classmethod
def list_all(cls) -> list[str]:
return list(cls._providers.keys())
@classmethod
def create(cls, name: str, **config) -> LLMProvider:
provider_class = cls._providers.get(name)
if not provider_class:
raise ValueError(f"Unknown provider: {name}")
return provider_class(**config)
# Providers self-register
@ProviderRegistry.register("claude")
class ClaudeProvider:
def __init__(self, api_key: str = None, model: str = "claude-sonnet-4-20250514"):
self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
self.model = model
async def generate(self, prompt: str, **kwargs) -> str:
# Implementation
pass
@ProviderRegistry.register("ollama")
class OllamaProvider:
def __init__(self, base_url: str = "http://localhost:11434", model: str = "qwen2.5-coder"):
self.base_url = base_url
self.model = model
async def generate(self, prompt: str, **kwargs) -> str:
# Implementation
pass
@ProviderRegistry.register("letta")
class LettaProvider:
def __init__(self, base_url: str = "http://localhost:8283"):
self.base_url = base_url
async def generate(self, prompt: str, **kwargs) -> str:
# Implementation
pass
# Usage
available = ProviderRegistry.list_all() # ["claude", "ollama", "letta"]
provider = ProviderRegistry.create("claude", model="claude-opus-4-5")Effort: Low
Verdict: YES - Clean extensibility.
What it is: Color-coded alerts at configurable thresholds.
Current BLACKICE approach: Log warnings only.
Why adopt: Visual status. Proactive alerts.
Implementation sketch:
@dataclass
class AlertThreshold:
name: str
operator: Literal["<", ">", "<=", ">=", "=="]
value: float
severity: Literal["info", "warning", "critical"]
message_template: str
DEFAULT_THRESHOLDS = [
AlertThreshold("quota_warning", "<", 50, "warning", "Quota below 50%: {value:.0f}%"),
AlertThreshold("quota_critical", "<", 20, "critical", "Quota critical: {value:.0f}%"),
AlertThreshold("quota_depleted", "==", 0, "critical", "Quota depleted!"),
AlertThreshold("error_rate_high", ">", 0.3, "warning", "Error rate high: {value:.0%}"),
AlertThreshold("latency_high", ">", 5000, "warning", "Latency high: {value}ms"),
]
class AlertManager:
"""Manage threshold-based alerts."""
def __init__(self, thresholds: list[AlertThreshold] = None):
self.thresholds = thresholds or DEFAULT_THRESHOLDS
self.active_alerts: dict[str, Alert] = {}
def check(self, metric: str, value: float) -> list[Alert]:
"""Check metric against thresholds."""
alerts = []
for threshold in self.thresholds:
if not self._matches(threshold, metric):
continue
triggered = self._evaluate(threshold, value)
alert_key = f"{metric}:{threshold.name}"
if triggered:
alert = Alert(
key=alert_key,
severity=threshold.severity,
message=threshold.message_template.format(value=value),
triggered_at=datetime.now()
)
self.active_alerts[alert_key] = alert
alerts.append(alert)
elif alert_key in self.active_alerts:
# Alert resolved
del self.active_alerts[alert_key]
return alerts
def _evaluate(self, threshold: AlertThreshold, value: float) -> bool:
ops = {
"<": lambda a, b: a < b,
">": lambda a, b: a > b,
"<=": lambda a, b: a <= b,
">=": lambda a, b: a >= b,
"==": lambda a, b: a == b,
}
return ops[threshold.operator](value, threshold.value)
# Integration with monitoring
async def monitoring_loop():
alert_manager = AlertManager()
quota_monitor = QuotaMonitor(providers)
while True:
await quota_monitor.refresh_all()
for name, quota in quota_monitor.get_status().items():
alerts = alert_manager.check(f"{name}_quota", quota.percentage)
for alert in alerts:
await notify(alert) # Desktop notification, webhook, etc.
await asyncio.sleep(60)Effort: Low
Verdict: YES - Proactive alerting.
Why skip: BLACKICE should remain cross-platform.
Why skip: BLACKICE is Python.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Quota Monitoring | YES | Medium | High |
| Provider Registry | YES | Low | High |
| Protocol-Based DI | YES | Medium | Medium |
| Chicago School TDD | YES | Low | Medium |
| Threshold Alerts | YES | Low | Low |
<!-- Source Gist 6 of 19: d1e2505a8ecf2bf430156b889c102dd6 -->
Quint-Code Ideas for BLACKICE
Ideas from Quint Code for BLACKICE.
Structured reasoning for AI coding tools using the First Principles Framework (FPF). Transforms chaotic AI decision-making into transparent, evidence-backed audit trails.
| Aspect | Quint Code | BLACKICE |
|---|---|---|
| Focus | Structured reasoning | Iterate-until-success |
| Method | FPF (abduction/deduction/induction) | Ralph Loop + consensus |
| State | .quint/ directory |
Beads event store |
| Output | Decision documents | Task results |
- Decision Documentation - Every choice preserved with rationale
- Hypothesis Scaffolding - Generate competing alternatives before convergence
- Evidence Lifecycle - Decay stale evidence, actualize with code changes
- Bias Auditing - Calculate confidence scores
- Q-Cycle Workflow - Q0 → Q5 structured reasoning phases
What it is: 6-phase reasoning cycle from problem to decision.
Current BLACKICE approach: Ad-hoc reasoning in prompts.
Why adopt: Consistent reasoning process. Better decisions.
Implementation sketch:
from enum import Enum
class QPhase(Enum):
Q0_INIT = "init" # Define problem
Q1_HYPOTHESIZE = "hypothesize" # Generate alternatives
Q2_SUPPORT = "support" # Gather evidence
Q3_CHALLENGE = "challenge" # Find counter-evidence
Q4_AUDIT = "audit" # Check biases
Q5_DECIDE = "decide" # Make decision
@dataclass
class QCycleState:
phase: QPhase
problem: str
hypotheses: list[dict] # {id, description, confidence}
evidence: list[dict] # {id, hypothesis_id, type, content, weight}
challenges: list[dict] # {id, hypothesis_id, content}
audit_results: dict # {biases_found, confidence_adjustments}
decision: dict | None # {hypothesis_id, rationale, confidence}
class QCycleRunner:
"""Run structured Q-Cycle reasoning."""
def __init__(self, llm: LLMAdapter):
self.llm = llm
async def run_cycle(self, problem: str) -> QCycleState:
"""Run complete Q-Cycle."""
state = QCycleState(
phase=QPhase.Q0_INIT,
problem=problem,
hypotheses=[],
evidence=[],
challenges=[],
audit_results={},
decision=None
)
# Q0: Initialize
state = await self._q0_init(state)
# Q1: Generate hypotheses
state = await self._q1_hypothesize(state)
# Q2: Gather supporting evidence
state = await self._q2_support(state)
# Q3: Find challenges
state = await self._q3_challenge(state)
# Q4: Audit for biases
state = await self._q4_audit(state)
# Q5: Make decision
state = await self._q5_decide(state)
return state
async def _q1_hypothesize(self, state: QCycleState) -> QCycleState:
"""Generate competing hypotheses."""
prompt = f"""
Problem: {state.problem}
Generate 3-5 distinct hypotheses/approaches to solve this problem.
For each hypothesis:
- Give it a unique ID (H1, H2, etc.)
- Describe the approach
- Assign initial confidence (0-1)
Format as JSON:
[{{"id": "H1", "description": "...", "confidence": 0.5}}, ...]
"""
response = await self.llm.generate(prompt)
state.hypotheses = json.loads(response)
state.phase = QPhase.Q1_HYPOTHESIZE
return state
async def _q4_audit(self, state: QCycleState) -> QCycleState:
"""Audit for cognitive biases."""
prompt = f"""
Review these hypotheses and evidence for cognitive biases:
Hypotheses:
{json.dumps(state.hypotheses, indent=2)}
Evidence:
{json.dumps(state.evidence, indent=2)}
Challenges:
{json.dumps(state.challenges, indent=2)}
Check for:
- Confirmation bias (favoring evidence that supports preferred hypothesis)
- Anchoring bias (over-weighting first hypothesis)
- Availability bias (favoring easily recalled examples)
- Overconfidence
For each bias found, suggest confidence adjustments.
Format:
{{"biases_found": ["..."], "confidence_adjustments": {{"H1": -0.1, "H2": +0.1}}}}
"""
response = await self.llm.generate(prompt)
state.audit_results = json.loads(response)
# Apply adjustments
for h in state.hypotheses:
adj = state.audit_results["confidence_adjustments"].get(h["id"], 0)
h["confidence"] = max(0, min(1, h["confidence"] + adj))
state.phase = QPhase.Q4_AUDIT
return stateEffort: Medium-High
Verdict: YES - More rigorous than ad-hoc reasoning.
What it is: Old evidence loses weight over time. Stale evidence is marked.
Current BLACKICE approach: All evidence weighted equally.
Why adopt: Codebase changes. Old evidence may be invalid.
Implementation sketch:
from datetime import datetime, timedelta
@dataclass
class Evidence:
id: str
content: str
source: str
created_at: datetime
weight: float
decay_rate: float = 0.1 # Lose 10% weight per week
@property
def current_weight(self) -> float:
"""Calculate decayed weight."""
age = datetime.now() - self.created_at
weeks = age.total_seconds() / (7 * 24 * 3600)
decay_factor = (1 - self.decay_rate) ** weeks
return self.weight * decay_factor
@property
def is_stale(self) -> bool:
"""Check if evidence is too old to be reliable."""
return self.current_weight < 0.2
class EvidenceManager:
"""Manage evidence with decay."""
def __init__(self, db_path: Path):
self.db = sqlite3.connect(db_path)
self._init_schema()
def add(self, evidence: Evidence):
"""Add new evidence."""
self.db.execute("""
INSERT INTO evidence (id, content, source, created_at, weight, decay_rate)
VALUES (?, ?, ?, ?, ?, ?)
""", (evidence.id, evidence.content, evidence.source,
evidence.created_at.isoformat(), evidence.weight, evidence.decay_rate))
self.db.commit()
def get_valid(self, hypothesis_id: str) -> list[Evidence]:
"""Get non-stale evidence for hypothesis."""
cursor = self.db.execute("""
SELECT * FROM evidence
WHERE hypothesis_id = ? AND current_weight > 0.2
ORDER BY current_weight DESC
""", (hypothesis_id,))
return [Evidence(**row) for row in cursor.fetchall()]
def mark_stale(self, evidence_id: str, reason: str):
"""Manually mark evidence as stale."""
self.db.execute("""
UPDATE evidence
SET weight = 0, stale_reason = ?
WHERE id = ?
""", (reason, evidence_id))
self.db.commit()
def refresh(self, evidence_id: str, new_content: str):
"""Refresh evidence with new information."""
self.db.execute("""
UPDATE evidence
SET content = ?, created_at = ?, weight = 1.0
WHERE id = ?
""", (new_content, datetime.now().isoformat(), evidence_id))
self.db.commit()
# Commands for evidence management
# /q-decay - Show stale evidence
# /q-refresh - Refresh evidence from current codeEffort: Medium
Verdict: YES - Realistic evidence handling.
What it is: Numerical confidence on decisions with explicit calculation.
Current BLACKICE approach: Binary pass/fail.
Why adopt: Weight agent proposals in consensus. Detect overconfidence.
Implementation sketch:
@dataclass
class ConfidenceBreakdown:
base_confidence: float # From hypothesis generation
evidence_support: float # +/- from supporting evidence
evidence_challenge: float # +/- from challenging evidence
bias_adjustment: float # From audit
historical_accuracy: float # Past accuracy on similar decisions
final_confidence: float
class ConfidenceCalculator:
"""Calculate and explain confidence scores."""
def __init__(self, history_db: Path):
self.history = HistoricalAccuracy(history_db)
def calculate(
self,
hypothesis: dict,
supporting: list[Evidence],
challenging: list[Evidence],
bias_adjustment: float = 0
) -> ConfidenceBreakdown:
"""Calculate confidence with full breakdown."""
base = hypothesis["confidence"]
# Evidence support
support_weight = sum(e.current_weight for e in supporting)
evidence_support = min(0.3, support_weight * 0.1)
# Evidence challenges
challenge_weight = sum(e.current_weight for e in challenging)
evidence_challenge = -min(0.3, challenge_weight * 0.1)
# Historical accuracy
similar_decisions = self.history.find_similar(hypothesis["description"])
if similar_decisions:
historical = sum(d.was_correct for d in similar_decisions) / len(similar_decisions)
historical_adjustment = (historical - 0.5) * 0.2 # +/- 0.1 max
else:
historical_adjustment = 0
final = base + evidence_support + evidence_challenge + bias_adjustment + historical_adjustment
final = max(0, min(1, final)) # Clamp to [0, 1]
return ConfidenceBreakdown(
base_confidence=base,
evidence_support=evidence_support,
evidence_challenge=evidence_challenge,
bias_adjustment=bias_adjustment,
historical_accuracy=historical_adjustment,
final_confidence=final
)
def explain(self, breakdown: ConfidenceBreakdown) -> str:
"""Human-readable confidence explanation."""
return f"""
Confidence: {breakdown.final_confidence:.0%}
Breakdown:
- Base confidence: {breakdown.base_confidence:.0%}
- Supporting evidence: {breakdown.evidence_support:+.0%}
- Challenging evidence: {breakdown.evidence_challenge:+.0%}
- Bias adjustment: {breakdown.bias_adjustment:+.0%}
- Historical accuracy: {breakdown.historical_accuracy:+.0%}
"""Effort: Medium
Verdict: YES - Explicit confidence is useful.
What it is: Every decision preserved with full rationale.
Current BLACKICE approach: Decisions in Beads events (less structured).
Why adopt: Audit trail. Learn from past decisions. Debug bad choices.
Implementation sketch:
@dataclass
class Decision:
id: str
task_id: str
timestamp: datetime
problem: str
chosen_hypothesis: str
alternatives_considered: list[str]
rationale: str
confidence: ConfidenceBreakdown
evidence_used: list[str]
outcome: Literal["pending", "success", "failure"] = "pending"
outcome_notes: str | None = None
class DecisionStore:
"""Store and retrieve decision documents."""
def __init__(self, base_path: Path):
self.base_path = base_path / ".quint" / "decisions"
self.base_path.mkdir(parents=True, exist_ok=True)
def save(self, decision: Decision):
"""Save decision document."""
path = self.base_path / f"{decision.id}.md"
content = f"""# Decision: {decision.id}
## Problem
{decision.problem}
## Chosen Approach
{decision.chosen_hypothesis}
## Alternatives Considered
{chr(10).join(f"- {a}" for a in decision.alternatives_considered)}
## Rationale
{decision.rationale}
## Confidence
{self._format_confidence(decision.confidence)}
## Evidence Used
{chr(10).join(f"- {e}" for e in decision.evidence_used)}
## Outcome
Status: {decision.outcome}
{decision.outcome_notes or ""}
---
Timestamp: {decision.timestamp.isoformat()}
Task: {decision.task_id}
"""
path.write_text(content)
def record_outcome(self, decision_id: str, outcome: str, notes: str):
"""Record outcome for learning."""
# Update decision document
# Also update historical accuracy database
pass
def find_similar(self, problem: str) -> list[Decision]:
"""Find past decisions on similar problems."""
# Search through decision documents
# Return relevant past decisions
passEffort: Medium
Verdict: YES - Better than unstructured events.
Why skip: BLACKICE has its own architecture.
Why skip: BLACKICE has its own CLI design.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Q-Cycle Structured Reasoning | YES | Medium | High |
| Confidence Scoring | YES | Medium | High |
| Decision Documents | YES | Medium | Medium |
| Evidence Decay | YES | Medium | Low |
<!-- Source Gist 7 of 19: 6a08ce38cb1dd646e0bce1e405e9c709 -->
Gentleman-Guardian-Angel Ideas for BLACKICE
Ideas from Gentleman Guardian Angel for BLACKICE.
A provider-agnostic AI code review tool that runs as a git pre-commit hook, validating staged files against project standards.
| Aspect | Guardian Angel | BLACKICE |
|---|---|---|
| Focus | Pre-commit code review | Iterate-until-success |
| Integration | Git hooks | CLI |
| Providers | Claude, Gemini, Ollama, any CLI | Claude, Ollama, Letta |
| Dependencies | Pure Bash | Python |
- Provider Agnostic - Works with any CLI-based AI
- Git-Native - Standard pre-commit/commit-msg hooks
- File Pattern Matching - Include/exclude specific file types
- Intelligent Caching - Skip unchanged files
- Zero Dependencies - Pure Bash implementation
What it is: Run AI review as part of git workflow automatically.
Current BLACKICE approach: Manual invocation only.
Why adopt: Enforce quality at commit time. No forgotten reviews.
Implementation sketch:
# blackice-hooks/pre-commit
#!/usr/bin/env python3
"""Pre-commit hook for BLACKICE code review."""
import subprocess
import sys
from pathlib import Path
def get_staged_files() -> list[Path]:
"""Get list of staged files."""
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True, text=True
)
return [Path(f) for f in result.stdout.strip().split("\n") if f]
def should_review(file: Path, patterns: list[str]) -> bool:
"""Check if file matches review patterns."""
for pattern in patterns:
if file.match(pattern):
return True
return False
def run_review(files: list[Path]) -> tuple[bool, str]:
"""Run BLACKICE review on files."""
from blackice import QuickReviewer
reviewer = QuickReviewer()
results = []
for file in files:
result = reviewer.review(file)
results.append((file, result))
# Check for blocking issues
blocking = [r for f, r in results if r.severity == "error"]
if blocking:
return False, format_issues(blocking)
return True, ""
def main():
files = get_staged_files()
patterns = ["*.py", "*.ts", "*.js", "*.tsx", "*.jsx"]
reviewable = [f for f in files if should_review(f, patterns)]
if not reviewable:
sys.exit(0)
print(f"🔍 Reviewing {len(reviewable)} files...")
passed, message = run_review(reviewable)
if not passed:
print(f"❌ Review failed:\n{message}")
print("\nFix issues or use --no-verify to skip")
sys.exit(1)
print("✅ Review passed")
sys.exit(0)
if __name__ == "__main__":
main()# Installation script
# blackice hooks install
#!/bin/bash
HOOK_DIR=".git/hooks"
PRE_COMMIT="$HOOK_DIR/pre-commit"
cat > "$PRE_COMMIT" << 'EOF'
#!/bin/bash
python3 -m blackice.hooks.pre_commit
EOF
chmod +x "$PRE_COMMIT"
echo "✅ Pre-commit hook installed"Effort: Low
Verdict: YES - Automatic quality enforcement.
What it is: Hash-based cache that skips unchanged files.
Current BLACKICE approach: No review caching.
Why adopt: Don't re-review unchanged files. Faster commits.
Implementation sketch:
import hashlib
from pathlib import Path
@dataclass
class CacheEntry:
file_hash: str
rules_hash: str
result: str
timestamp: datetime
class ReviewCache:
"""Content-addressable cache for code reviews."""
def __init__(self, cache_dir: Path = None):
self.cache_dir = cache_dir or Path.home() / ".cache" / "blackice" / "reviews"
self.cache_dir.mkdir(parents=True, exist_ok=True)
def _hash_file(self, file: Path) -> str:
"""Hash file contents."""
return hashlib.sha256(file.read_bytes()).hexdigest()
def _hash_rules(self, rules_file: Path) -> str:
"""Hash rules file to detect rule changes."""
if not rules_file.exists():
return "default"
return hashlib.sha256(rules_file.read_bytes()).hexdigest()[:16]
def _cache_key(self, file: Path, rules_hash: str) -> str:
"""Generate cache key from file hash + rules hash."""
file_hash = self._hash_file(file)
return f"{file_hash[:16]}_{rules_hash}"
def get(self, file: Path, rules_file: Path) -> str | None:
"""Get cached review result if valid."""
rules_hash = self._hash_rules(rules_file)
key = self._cache_key(file, rules_hash)
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
entry = CacheEntry(**json.loads(cache_file.read_text()))
# Verify hashes still match
if entry.file_hash == self._hash_file(file):
return entry.result
# Cache invalidated by content change
cache_file.unlink()
return None
def set(self, file: Path, rules_file: Path, result: str):
"""Cache review result."""
rules_hash = self._hash_rules(rules_file)
key = self._cache_key(file, rules_hash)
entry = CacheEntry(
file_hash=self._hash_file(file),
rules_hash=rules_hash,
result=result,
timestamp=datetime.now()
)
cache_file = self.cache_dir / f"{key}.json"
cache_file.write_text(json.dumps(entry.__dict__, default=str))
def invalidate_all(self):
"""Clear entire cache (e.g., when rules change)."""
for f in self.cache_dir.glob("*.json"):
f.unlink()
# Usage in reviewer
cache = ReviewCache()
for file in files_to_review:
cached = cache.get(file, rules_file)
if cached:
print(f"⏭️ {file} (cached)")
continue
result = await review_file(file)
cache.set(file, rules_file, result)
print(f"✅ {file} reviewed")Effort: Low
Verdict: YES - Faster reviews.
What it is: Project standards defined in a separate file, not hardcoded.
Current BLACKICE approach: Prompts embedded in code.
Why adopt: Easy to update rules without code changes. Version controlled.
Implementation sketch:
<!-- AGENTS.md - Project coding standards -->
# Code Review Standards
## Required Patterns
- All functions must have docstrings
- Type hints required for function parameters and returns
- Maximum function length: 50 lines
- Maximum file length: 500 lines
## Forbidden Patterns
- No `print()` statements in production code
- No hardcoded credentials or API keys
- No `TODO` comments older than 30 days
- No unused imports
## Style Guidelines
- Use f-strings over .format() or %
- Prefer list comprehensions over map/filter
- Use pathlib over os.path
- Snake_case for functions, PascalCase for classes
## Security Requirements
- Sanitize all user input
- Use parameterized queries for SQL
- Validate file paths to prevent traversal
- No shell=True in subprocess calls
## Test Requirements
- All public functions must have tests
- Minimum coverage: 80%
- Use pytest, not unittestclass RulesLoader:
"""Load rules from external file."""
DEFAULT_PATH = Path("AGENTS.md")
def load(self, path: Path = None) -> str:
"""Load rules file as prompt context."""
path = path or self.DEFAULT_PATH
if not path.exists():
return self._default_rules()
content = path.read_text()
return self._parse_rules(content)
def _parse_rules(self, content: str) -> str:
"""Parse markdown rules into structured prompt."""
# Keep as markdown - LLMs understand it well
return f"""
You are a code reviewer. Apply these project-specific standards:
{content}
Review the following code and identify any violations of these standards.
Format your response as:
- ❌ VIOLATION: <description> (line X)
- ⚠️ WARNING: <description>
- ✅ PASS if no issues found
"""
def _default_rules(self) -> str:
"""Default rules if no file exists."""
return """
You are a code reviewer. Check for:
- Code quality and readability
- Potential bugs or errors
- Security issues
- Performance problems
Be constructive but thorough.
"""Effort: Low
Verdict: YES - Configurable rules.
What it is: Fail CI on ambiguous AI responses.
Current BLACKICE approach: Trust AI output.
Why adopt: Don't let unclear reviews pass. Human must resolve ambiguity.
Implementation sketch:
@dataclass
class ReviewResult:
status: Literal["pass", "fail", "ambiguous"]
issues: list[str]
raw_response: str
class StrictModeReviewer:
"""Reviewer with strict mode for CI."""
PASS_INDICATORS = ["✅", "PASS", "no issues", "looks good", "approved"]
FAIL_INDICATORS = ["❌", "FAIL", "violation", "error", "must fix"]
AMBIGUOUS_INDICATORS = ["might", "could", "consider", "possibly", "unclear"]
def __init__(self, strict: bool = False):
self.strict = strict
def parse_result(self, response: str) -> ReviewResult:
"""Parse AI response into structured result."""
response_lower = response.lower()
# Check for clear pass
if any(ind.lower() in response_lower for ind in self.PASS_INDICATORS):
has_fail = any(ind.lower() in response_lower for ind in self.FAIL_INDICATORS)
if not has_fail:
return ReviewResult("pass", [], response)
# Check for clear fail
if any(ind.lower() in response_lower for ind in self.FAIL_INDICATORS):
issues = self._extract_issues(response)
return ReviewResult("fail", issues, response)
# Check for ambiguous
if self.strict and any(ind in response_lower for ind in self.AMBIGUOUS_INDICATORS):
return ReviewResult("ambiguous", ["Response was ambiguous"], response)
# Default based on mode
if self.strict:
return ReviewResult("ambiguous", ["Could not determine result"], response)
return ReviewResult("pass", [], response)
def _extract_issues(self, response: str) -> list[str]:
"""Extract issue descriptions from response."""
issues = []
for line in response.split("\n"):
if any(ind in line for ind in ["❌", "VIOLATION", "ERROR"]):
issues.append(line.strip())
return issues
# CI usage
reviewer = StrictModeReviewer(strict=True)
result = reviewer.parse_result(ai_response)
if result.status == "ambiguous":
print("⚠️ Review result was ambiguous. Manual review required.")
print(f"Raw response:\n{result.raw_response}")
sys.exit(1)Effort: Low
Verdict: YES - Safer for CI pipelines.
What it is: Pure Bash implementation with no runtime dependencies.
Current BLACKICE approach: Python with many dependencies.
Why adopt: Works anywhere. No installation friction.
BUT: This is a design choice, not a feature. BLACKICE is Python.
Verdict: NO - Python is fine. Don't rewrite in Bash.
Why skip: BLACKICE is Python-native. Bash limits functionality.
Why skip: BLACKICE needs proper package structure.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Git Hook Integration | YES | Low | High |
| Content-Addressable Cache | YES | Low | High |
| External Rules File | YES | Low | Medium |
| Strict Mode for CI | YES | Low | Medium |
| Zero Dependencies | NO | - | - |
<!-- Source Gist 8 of 19: 3fe6e9c14fbaab1a04ac6c04e9b12cc8 -->
Auto-Claude Ideas for BLACKICE
Ideas from Auto-Claude for BLACKICE.
An autonomous multi-agent AI coding framework that orchestrates planning, implementation, QA, and deployment without continuous human intervention.
| Aspect | Auto-Claude | BLACKICE |
|---|---|---|
| Focus | Autonomous end-to-end | Iterate-until-success |
| Isolation | Git worktrees | Worktree pool |
| Parallelism | Up to 12 terminals | Worker pool |
| QA | Built-in validation loop | Consensus voting |
| License | AGPL-3.0 | MIT |
- Multi-Layer Agent System - Planning, implementation, QA, merge agents
- Dynamic Command Allowlisting - Stack-aware command restrictions
- Three-Layer Security Sandbox - OS, filesystem, command filtering
- Self-Validating QA Loop - Catches issues before human review
- Memory Persistence - Insights retained across sessions
What it is: Detect project stack and restrict commands to those relevant.
Current BLACKICE approach: Static command restrictions.
Why adopt: Python projects shouldn't run npm. Node projects shouldn't run pip. Reduce attack surface.
Implementation sketch:
from pathlib import Path
@dataclass
class StackProfile:
name: str
indicators: list[str] # Files that indicate this stack
allowed_commands: list[str]
package_managers: list[str]
test_commands: list[str]
build_commands: list[str]
STACK_PROFILES = [
StackProfile(
name="python",
indicators=["pyproject.toml", "setup.py", "requirements.txt", "Pipfile"],
allowed_commands=["python", "python3", "pip", "uv", "pytest", "ruff", "mypy"],
package_managers=["pip", "uv", "pipenv", "poetry"],
test_commands=["pytest", "python -m pytest", "python -m unittest"],
build_commands=["python -m build", "pip wheel"]
),
StackProfile(
name="node",
indicators=["package.json", "yarn.lock", "pnpm-lock.yaml"],
allowed_commands=["node", "npm", "npx", "yarn", "pnpm", "bun", "tsx"],
package_managers=["npm", "yarn", "pnpm", "bun"],
test_commands=["npm test", "yarn test", "jest", "vitest"],
build_commands=["npm run build", "yarn build"]
),
StackProfile(
name="rust",
indicators=["Cargo.toml"],
allowed_commands=["cargo", "rustc", "rustup", "rustfmt", "clippy"],
package_managers=["cargo"],
test_commands=["cargo test"],
build_commands=["cargo build"]
),
StackProfile(
name="go",
indicators=["go.mod", "go.sum"],
allowed_commands=["go", "gofmt", "golint"],
package_managers=["go mod"],
test_commands=["go test"],
build_commands=["go build"]
),
]
class StackDetector:
"""Detect project stack from files."""
def detect(self, project_root: Path) -> list[StackProfile]:
"""Detect all stacks in project."""
detected = []
for profile in STACK_PROFILES:
for indicator in profile.indicators:
if (project_root / indicator).exists():
detected.append(profile)
break
return detected
class DynamicAllowlist:
"""Restrict commands based on detected stack."""
def __init__(self, project_root: Path):
self.detector = StackDetector()
self.stacks = self.detector.detect(project_root)
self.allowed = self._build_allowlist()
def _build_allowlist(self) -> set[str]:
"""Build combined allowlist from all detected stacks."""
allowed = {"git", "ls", "cat", "grep", "find", "mkdir", "cp", "mv"} # Always allowed
for stack in self.stacks:
allowed.update(stack.allowed_commands)
return allowed
def is_allowed(self, command: str) -> bool:
"""Check if command is allowed for this project."""
# Extract base command
parts = command.split()
if not parts:
return False
base_cmd = parts[0]
return base_cmd in self.allowed
def get_test_command(self) -> str | None:
"""Get appropriate test command for stack."""
if self.stacks:
return self.stacks[0].test_commands[0]
return NoneEffort: Medium
Verdict: YES - Smarter command restrictions.
What it is: Automatically test generated code before flagging for human review.
Current BLACKICE approach: Consensus votes on correctness.
Why adopt: Don't waste human time on broken code. Catch issues early.
Implementation sketch:
@dataclass
class QAResult:
passed: bool
build_status: bool
test_status: bool
lint_status: bool
coverage: float
issues: list[str]
class SelfValidatingQA:
"""Automatic QA before human review."""
def __init__(self, project_root: Path, allowlist: DynamicAllowlist):
self.root = project_root
self.allowlist = allowlist
async def validate(self, changes: list[Path]) -> QAResult:
"""Run full QA pipeline on changes."""
issues = []
# 1. Lint check
lint_result = await self._run_lint(changes)
if not lint_result.passed:
issues.extend(lint_result.issues)
# 2. Type check (if applicable)
type_result = await self._run_typecheck(changes)
if not type_result.passed:
issues.extend(type_result.issues)
# 3. Build check
build_result = await self._run_build()
if not build_result.passed:
issues.extend(build_result.issues)
# Don't proceed to tests if build fails
return QAResult(
passed=False,
build_status=False,
test_status=False,
lint_status=lint_result.passed,
coverage=0,
issues=issues
)
# 4. Test run
test_result = await self._run_tests()
if not test_result.passed:
issues.extend(test_result.issues)
# 5. Coverage check
coverage = await self._get_coverage()
return QAResult(
passed=len(issues) == 0,
build_status=build_result.passed,
test_status=test_result.passed,
lint_status=lint_result.passed,
coverage=coverage,
issues=issues
)
async def _run_lint(self, files: list[Path]):
"""Run linter on changed files."""
if "ruff" in self.allowlist.allowed:
cmd = f"ruff check {' '.join(str(f) for f in files)}"
elif "eslint" in self.allowlist.allowed:
cmd = f"eslint {' '.join(str(f) for f in files)}"
else:
return LintResult(passed=True, issues=[])
result = await self._exec(cmd)
return self._parse_lint_output(result)
async def fix_and_retry(self, qa_result: QAResult, agent) -> QAResult:
"""Have agent fix issues and re-run QA."""
if qa_result.passed:
return qa_result
# Give agent the issues to fix
fix_prompt = f"""
The following QA issues were found:
{chr(10).join(f"- {issue}" for issue in qa_result.issues)}
Please fix these issues. Do not change functionality, only fix the issues.
"""
await agent.run(fix_prompt)
# Re-run QA
return await self.validate(qa_result.changed_files)Effort: Medium
Verdict: YES - Reduces human review burden.
What it is: Agents remember codebase insights across sessions.
Current BLACKICE approach: Beads stores events, but not structured insights.
Why adopt: Don't re-learn the same things. Faster subsequent runs.
Implementation sketch:
@dataclass
class CodebaseInsight:
category: str # "pattern", "antipattern", "preference", "constraint"
description: str
confidence: float
source: str # Where this was learned
created_at: datetime
last_used: datetime
class InsightMemory:
"""Persistent memory of codebase insights."""
def __init__(self, db_path: Path):
self.db = sqlite3.connect(db_path)
self._init_schema()
def _init_schema(self):
self.db.execute("""
CREATE TABLE IF NOT EXISTS insights (
id TEXT PRIMARY KEY,
category TEXT,
description TEXT,
confidence REAL,
source TEXT,
created_at TEXT,
last_used TEXT,
use_count INTEGER DEFAULT 0
)
""")
def add_insight(self, insight: CodebaseInsight):
"""Add or update insight."""
self.db.execute("""
INSERT INTO insights (id, category, description, confidence, source, created_at, last_used)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
confidence = (confidence + excluded.confidence) / 2,
last_used = excluded.last_used,
use_count = use_count + 1
""", (
self._hash(insight.description),
insight.category,
insight.description,
insight.confidence,
insight.source,
insight.created_at.isoformat(),
insight.last_used.isoformat()
))
self.db.commit()
def get_relevant(self, context: str, limit: int = 10) -> list[CodebaseInsight]:
"""Get insights relevant to current context."""
# Simple keyword matching (could use embeddings)
keywords = set(context.lower().split())
cursor = self.db.execute("""
SELECT * FROM insights
ORDER BY confidence DESC, use_count DESC
LIMIT ?
""", (limit * 3,)) # Over-fetch, then filter
results = []
for row in cursor.fetchall():
desc_keywords = set(row["description"].lower().split())
if keywords & desc_keywords: # Any overlap
results.append(CodebaseInsight(**row))
return results[:limit]
def format_for_prompt(self, insights: list[CodebaseInsight]) -> str:
"""Format insights for agent prompt."""
if not insights:
return ""
lines = ["## Codebase Insights (from previous sessions)", ""]
for i in insights:
lines.append(f"- **{i.category}**: {i.description} (confidence: {i.confidence:.0%})")
return "\n".join(lines)
# Auto-learn insights from agent conversations
class InsightExtractor:
"""Extract insights from agent outputs."""
INSIGHT_PATTERNS = [
(r"I noticed that this codebase (.+)", "pattern"),
(r"This project (?:prefers|uses) (.+)", "preference"),
(r"Avoid (.+) because (.+)", "antipattern"),
(r"This codebase requires (.+)", "constraint"),
]
def extract(self, agent_output: str) -> list[CodebaseInsight]:
insights = []
for pattern, category in self.INSIGHT_PATTERNS:
matches = re.findall(pattern, agent_output, re.IGNORECASE)
for match in matches:
insights.append(CodebaseInsight(
category=category,
description=match if isinstance(match, str) else " ".join(match),
confidence=0.7, # Initial confidence
source="agent_output",
created_at=datetime.now(),
last_used=datetime.now()
))
return insightsEffort: Medium
Verdict: YES - Learn and remember.
What it is: OS isolation + filesystem restrictions + command filtering.
Current BLACKICE approach: Command filtering only.
Why adopt: Defense in depth. Multiple layers of protection.
Implementation sketch:
import os
import tempfile
from pathlib import Path
class SecuritySandbox:
"""Three-layer security sandbox for agent execution."""
def __init__(self, project_root: Path, allowlist: DynamicAllowlist):
self.root = project_root.resolve()
self.allowlist = allowlist
self.allowed_paths = self._build_allowed_paths()
def _build_allowed_paths(self) -> set[Path]:
"""Build set of paths agent can access."""
allowed = {
self.root,
Path(tempfile.gettempdir()),
Path.home() / ".cache",
}
# Add all subdirectories of project
for p in self.root.rglob("*"):
if p.is_dir():
allowed.add(p)
return allowed
# Layer 1: Filesystem restrictions
def check_path(self, path: str | Path) -> bool:
"""Check if path is within allowed boundaries."""
try:
resolved = Path(path).resolve()
# Never allow system paths
forbidden = ["/etc", "/usr", "/bin", "/sbin", "/var", "/root"]
if any(str(resolved).startswith(f) for f in forbidden):
return False
# Check against allowed paths
for allowed in self.allowed_paths:
try:
resolved.relative_to(allowed)
return True
except ValueError:
continue
return False
except Exception:
return False
# Layer 2: Command filtering
def check_command(self, command: str) -> tuple[bool, str | None]:
"""Check if command is allowed."""
# First check allowlist
if not self.allowlist.is_allowed(command):
return False, f"Command not allowed for this project stack"
# Then check for dangerous patterns
dangerous = [
(r"rm\s+-rf\s+/", "Recursive delete of root"),
(r">\s*/etc/", "Write to /etc"),
(r"chmod\s+777", "Insecure permissions"),
(r"\|\s*sh", "Pipe to shell"),
]
for pattern, reason in dangerous:
if re.search(pattern, command):
return False, reason
return True, None
# Layer 3: Environment isolation
def get_safe_env(self) -> dict:
"""Get sanitized environment for subprocess."""
safe_env = {}
allowed_vars = [
"PATH", "HOME", "USER", "LANG", "LC_ALL",
"PYTHONPATH", "NODE_PATH", "GOPATH",
"TERM", "SHELL",
]
for var in allowed_vars:
if var in os.environ:
safe_env[var] = os.environ[var]
# Remove potentially dangerous vars
safe_env.pop("LD_PRELOAD", None)
safe_env.pop("LD_LIBRARY_PATH", None)
return safe_env
async def execute(self, command: str) -> tuple[bool, str]:
"""Execute command within sandbox."""
# Check command
allowed, reason = self.check_command(command)
if not allowed:
return False, f"Blocked: {reason}"
# Execute with restrictions
try:
result = await asyncio.create_subprocess_shell(
command,
cwd=self.root,
env=self.get_safe_env(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await result.communicate()
return result.returncode == 0, stdout.decode() + stderr.decode()
except Exception as e:
return False, str(e)Effort: Medium
Verdict: YES - Essential for production.
Why skip: Too restrictive for some use cases. BLACKICE is MIT.
Why skip: CLI is more flexible. Desktop app is unnecessary.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Dynamic Command Allowlisting | YES | Medium | High |
| Self-Validating QA Loop | YES | Medium | High |
| Memory Persistence | YES | Medium | Medium |
| Three-Layer Sandbox | YES | Medium | Medium |
<!-- Source Gist 9 of 19: ea58818ae51813ac3f0f821dd7f77cc0 -->
Continuous-Claude-v2 Ideas for BLACKICE
Ideas from Continuous Claude v2 for BLACKICE.
A lossless state preservation system that maintains project continuity across sessions through ledgers, handoffs, and artifact indexing.
| Aspect | Continuous Claude | BLACKICE |
|---|---|---|
| Focus | Session continuity | Iterate-until-success |
| Memory | Ledgers + Handoffs | Beads event store |
| Learning | Artifact index + Braintrust | Reflexion |
| Agents | Plan → Validate → Implement | Supervisor + Workers |
- Continuity Ledger - Lossless session state snapshots
- Handoff System - Structured session transfer documents
- Artifact Index - SQLite+FTS5 searchable database
- 10 Hook Types - Lifecycle event interception
- TDD Workflow - Test-first implementation
What it is: Lossless state snapshots instead of lossy compaction.
Current BLACKICE approach: Beads events (similar, but less structured).
Why adopt: Explicit ledger format is easier to read/debug than event replay.
Implementation sketch:
@dataclass
class LedgerEntry:
timestamp: datetime
phase: str
status: Literal["started", "completed", "blocked", "failed"]
decision: str | None
rationale: str | None
artifacts: list[str]
learnings: list[str]
@dataclass
class ContinuityLedger:
task_id: str
tech_stack: dict
phases: list[LedgerEntry]
decisions: list[dict]
learnings: list[str]
rules_generated: list[str]
class LedgerManager:
"""Manage continuity ledgers."""
def __init__(self, base_path: Path):
self.base_path = base_path
def get_ledger_path(self, task_id: str) -> Path:
return self.base_path / "thoughts" / "ledgers" / f"CONTINUITY_{task_id}.md"
def load(self, task_id: str) -> ContinuityLedger | None:
path = self.get_ledger_path(task_id)
if not path.exists():
return None
return self._parse_ledger(path.read_text())
def save(self, ledger: ContinuityLedger):
path = self.get_ledger_path(ledger.task_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(self._format_ledger(ledger))
def append_entry(self, task_id: str, entry: LedgerEntry):
ledger = self.load(task_id) or ContinuityLedger(task_id, {}, [], [], [], [])
ledger.phases.append(entry)
self.save(ledger)
def _format_ledger(self, ledger: ContinuityLedger) -> str:
lines = [
f"# Continuity Ledger: {ledger.task_id}",
"",
"## Tech Stack",
yaml.dump(ledger.tech_stack),
"",
"## Phases",
]
for entry in ledger.phases:
lines.append(f"\n### {entry.phase} ({entry.status})")
lines.append(f"- **Time**: {entry.timestamp}")
if entry.decision:
lines.append(f"- **Decision**: {entry.decision}")
if entry.rationale:
lines.append(f"- **Rationale**: {entry.rationale}")
if entry.learnings:
lines.append("- **Learnings**:")
for l in entry.learnings:
lines.append(f" - {l}")
lines.extend([
"",
"## Accumulated Learnings",
*[f"- {l}" for l in ledger.learnings],
"",
"## Generated Rules",
*[f"- {r}" for r in ledger.rules_generated],
])
return "\n".join(lines)Effort: Medium
Verdict: YES - Better than raw event streams.
What it is: Structured documents enabling agent-to-agent or session-to-session context transfer.
Current BLACKICE approach: No explicit handoff mechanism.
Why adopt: Clean context transfer. Multi-agent coordination.
Implementation sketch:
@dataclass
class Handoff:
id: str
from_agent: str
to_agent: str | None # None = next session
created_at: datetime
# Context
task_summary: str
current_phase: str
completed_work: list[str]
remaining_work: list[str]
# State
key_decisions: list[dict]
open_questions: list[str]
blockers: list[str]
# Artifacts
artifacts_created: list[str]
files_modified: list[str]
# Learnings
what_worked: list[str]
what_didnt: list[str]
recommendations: list[str]
class HandoffManager:
"""Manage agent handoffs."""
def __init__(self, base_path: Path):
self.base_path = base_path / "thoughts" / "shared" / "handoffs"
self.base_path.mkdir(parents=True, exist_ok=True)
def create_handoff(self, agent_id: str, task: Task, state: dict) -> Handoff:
"""Create handoff from current agent state."""
handoff = Handoff(
id=f"handoff-{uuid4().hex[:8]}",
from_agent=agent_id,
to_agent=None,
created_at=datetime.now(),
task_summary=task.description,
current_phase=state.get("phase", "unknown"),
completed_work=state.get("completed", []),
remaining_work=state.get("remaining", []),
key_decisions=state.get("decisions", []),
open_questions=state.get("questions", []),
blockers=state.get("blockers", []),
artifacts_created=state.get("artifacts", []),
files_modified=state.get("files", []),
what_worked=state.get("worked", []),
what_didnt=state.get("failed", []),
recommendations=state.get("recommendations", [])
)
self._save(handoff)
return handoff
def get_latest(self, task_id: str) -> Handoff | None:
"""Get most recent handoff for task."""
pattern = f"*{task_id}*.md"
handoffs = sorted(self.base_path.glob(pattern), key=lambda p: p.stat().st_mtime)
if not handoffs:
return None
return self._load(handoffs[-1])
def to_prompt(self, handoff: Handoff) -> str:
"""Convert handoff to agent prompt."""
return f"""
## Handoff from Previous Session
### Task Summary
{handoff.task_summary}
### Current Phase
{handoff.current_phase}
### Completed Work
{chr(10).join(f"- {w}" for w in handoff.completed_work)}
### Remaining Work
{chr(10).join(f"- {w}" for w in handoff.remaining_work)}
### Key Decisions Made
{chr(10).join(f"- {d['decision']}: {d['rationale']}" for d in handoff.key_decisions)}
### Open Questions
{chr(10).join(f"- {q}" for q in handoff.open_questions)}
### Recommendations
{chr(10).join(f"- {r}" for r in handoff.recommendations)}
---
Continue from where the previous session left off.
"""Effort: Medium
Verdict: YES - Essential for multi-agent coordination.
What it is: Searchable database of agent outputs, decisions, and patterns.
Current BLACKICE approach: Beads stores events but limited search.
Why adopt: Fast full-text search across all historical artifacts.
Implementation sketch:
import sqlite3
class ArtifactIndex:
"""SQLite+FTS5 searchable artifact index."""
def __init__(self, db_path: Path):
self.conn = sqlite3.connect(db_path)
self._init_schema()
def _init_schema(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS artifacts (
id TEXT PRIMARY KEY,
task_id TEXT,
type TEXT,
title TEXT,
content TEXT,
created_at TIMESTAMP,
metadata JSON
);
CREATE VIRTUAL TABLE IF NOT EXISTS artifacts_fts USING fts5(
title, content, task_id,
content='artifacts',
content_rowid='rowid'
);
CREATE TRIGGER IF NOT EXISTS artifacts_ai AFTER INSERT ON artifacts BEGIN
INSERT INTO artifacts_fts(rowid, title, content, task_id)
VALUES (new.rowid, new.title, new.content, new.task_id);
END;
""")
def add(self, artifact: dict):
"""Add artifact to index."""
self.conn.execute("""
INSERT INTO artifacts (id, task_id, type, title, content, created_at, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
artifact["id"],
artifact["task_id"],
artifact["type"],
artifact["title"],
artifact["content"],
datetime.now().isoformat(),
json.dumps(artifact.get("metadata", {}))
))
self.conn.commit()
def search(self, query: str, limit: int = 10) -> list[dict]:
"""Full-text search across artifacts."""
cursor = self.conn.execute("""
SELECT a.*, highlight(artifacts_fts, 1, '<mark>', '</mark>') as snippet
FROM artifacts a
JOIN artifacts_fts f ON a.rowid = f.rowid
WHERE artifacts_fts MATCH ?
ORDER BY rank
LIMIT ?
""", (query, limit))
return [dict(row) for row in cursor.fetchall()]
def get_by_type(self, artifact_type: str, limit: int = 50) -> list[dict]:
"""Get artifacts by type."""
cursor = self.conn.execute("""
SELECT * FROM artifacts
WHERE type = ?
ORDER BY created_at DESC
LIMIT ?
""", (artifact_type, limit))
return [dict(row) for row in cursor.fetchall()]
def find_similar_decisions(self, query: str) -> list[dict]:
"""Find past decisions similar to current situation."""
return self.search(f"type:decision {query}")
# Usage
index = ArtifactIndex(Path(".agent/artifact-index.db"))
# Index a decision
index.add({
"id": "dec-123",
"task_id": "task-456",
"type": "decision",
"title": "Use PostgreSQL over SQLite",
"content": "Chose PostgreSQL for production due to concurrent write requirements...",
"metadata": {"confidence": 0.9}
})
# Search later
similar = index.search("database choice concurrent writes")Effort: Medium
Verdict: YES - Searchable history is powerful.
What it is: Pre-implementation checks against precedent and best practices.
Current BLACKICE approach: Execute then check.
Why adopt: Catch issues before wasting execution time.
Implementation sketch:
@dataclass
class ValidationResult:
passed: bool
checks: list[tuple[str, bool, str]] # (check_name, passed, message)
blockers: list[str]
warnings: list[str]
class ValidationFunnel:
"""Pre-implementation validation pipeline."""
def __init__(self, artifact_index: ArtifactIndex, web_searcher):
self.index = artifact_index
self.web = web_searcher
async def validate(self, plan: Plan) -> ValidationResult:
"""Run all validation checks."""
checks = []
blockers = []
warnings = []
# Check 1: Precedent (RAG-judge)
precedent_check = await self._check_precedent(plan)
checks.append(("precedent", precedent_check.passed, precedent_check.message))
if not precedent_check.passed:
warnings.append(precedent_check.message)
# Check 2: Best practices (web search)
practices_check = await self._check_best_practices(plan)
checks.append(("best_practices", practices_check.passed, practices_check.message))
if not practices_check.passed:
warnings.append(practices_check.message)
# Check 3: Tech stack compatibility
stack_check = self._check_stack_compatibility(plan)
checks.append(("stack", stack_check.passed, stack_check.message))
if not stack_check.passed:
blockers.append(stack_check.message)
# Check 4: Resource constraints
resource_check = self._check_resources(plan)
checks.append(("resources", resource_check.passed, resource_check.message))
if not resource_check.passed:
blockers.append(resource_check.message)
return ValidationResult(
passed=len(blockers) == 0,
checks=checks,
blockers=blockers,
warnings=warnings
)
async def _check_precedent(self, plan: Plan):
"""Check if similar approaches succeeded before."""
similar = self.index.search(plan.summary, limit=5)
if not similar:
return Check(True, "No precedent found (novel approach)")
successes = [s for s in similar if s["metadata"].get("outcome") == "success"]
if len(successes) >= 3:
return Check(True, f"Found {len(successes)} successful precedents")
return Check(False, f"Only {len(successes)}/5 similar attempts succeeded")
async def _check_best_practices(self, plan: Plan):
"""Search for best practices and compare."""
results = await self.web.search(f"{plan.tech_stack} best practices {plan.domain}")
# LLM comparison of plan vs best practices
return Check(True, "Aligns with best practices")Effort: Medium-High
Verdict: YES - Prevent issues before execution.
What it is: Store extended thinking per checkpoint for later recall.
Current BLACKICE approach: Only store outputs, not reasoning.
Why adopt: Recall WHY decisions were made. Debug bad choices.
Implementation sketch:
@dataclass
class ReasoningCapture:
checkpoint_id: str
timestamp: datetime
prompt: str
thinking: str # Extended thinking/chain-of-thought
decision: str
confidence: float
alternatives_considered: list[str]
class ReasoningStore:
"""Store and retrieve reasoning history."""
def __init__(self, base_path: Path):
self.base_path = base_path / ".git" / "claude" / "reasoning"
self.base_path.mkdir(parents=True, exist_ok=True)
def capture(self, commit_hash: str, reasoning: ReasoningCapture):
"""Store reasoning for a commit."""
path = self.base_path / commit_hash / "reasoning.md"
path.parent.mkdir(parents=True, exist_ok=True)
content = f"""# Reasoning for {commit_hash}
## Timestamp
{reasoning.timestamp.isoformat()}
## Prompt
{reasoning.prompt}
## Thinking Process
{reasoning.thinking}
## Decision
{reasoning.decision}
## Confidence
{reasoning.confidence:.0%}
## Alternatives Considered
{chr(10).join(f"- {a}" for a in reasoning.alternatives_considered)}
"""
path.write_text(content)
def recall(self, query: str) -> list[ReasoningCapture]:
"""Find past reasoning related to query."""
# Search through stored reasoning
results = []
for path in self.base_path.rglob("reasoning.md"):
content = path.read_text()
if query.lower() in content.lower():
results.append(self._parse(content))
return resultsEffort: Low-Medium
Verdict: YES - Debuggable decision history.
Why skip: External dependency. BLACKICE's Beads is sufficient.
Why skip: Paid tool. Use open alternatives.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Handoff System | YES | Medium | High |
| Continuity Ledger | YES | Medium | High |
| Artifact Index (FTS5) | YES | Medium | Medium |
| Validation Funnel | YES | Medium | Medium |
| Reasoning History | YES | Low | Low |
<!-- Source Gist 10 of 19: 0eb96e20ff00f58bce4b0a99c4abe06c -->
Claude-Code-Safety-Net Ideas for BLACKICE
Ideas from Claude Code Safety Net for BLACKICE.
A plugin that prevents AI agents from executing destructive commands by intercepting bash operations before execution.
| Aspect | Safety Net | BLACKICE |
|---|---|---|
| Focus | Prevent destructive commands | Iterate-until-success |
| Method | Semantic command analysis | SafetyGuard policies |
| Scope | User + Project config | Global config |
| Analysis | Parse flags, unwrap shells | Pattern matching |
- Semantic Command Analysis - Parses arguments, understands flag combinations
- Shell Wrapper Detection - Recursively analyzes
bash -c,sh -c - Dual-Scope Config - User-level + project-level rules
- Fail-Safe Defaults - Falls back to built-in protections on config errors
- Paranoid Mode - Extra strict restrictions
What it is: Parse flags and understand dangerous combinations, not just prefixes.
Current BLACKICE approach: Simple pattern matching.
Why adopt: git checkout -b is safe, git checkout -- is dangerous. Need to understand context.
Implementation sketch:
import shlex
from dataclasses import dataclass
@dataclass
class ParsedCommand:
executable: str
subcommand: str | None
flags: list[str]
args: list[str]
raw: str
class CommandParser:
"""Parse commands into structured format."""
def parse(self, command: str) -> ParsedCommand:
parts = shlex.split(command)
if not parts:
return ParsedCommand("", None, [], [], command)
executable = parts[0]
flags = [p for p in parts[1:] if p.startswith("-")]
args = [p for p in parts[1:] if not p.startswith("-")]
subcommand = args[0] if args and not args[0].startswith("/") else None
return ParsedCommand(
executable=executable,
subcommand=subcommand,
flags=flags,
args=args,
raw=command
)
@dataclass
class DangerRule:
executable: str
subcommand: str | None
dangerous_flags: list[str]
safe_flags: list[str] # These make it safe even with dangerous flags
reason: str
DANGER_RULES = [
DangerRule(
executable="git",
subcommand="checkout",
dangerous_flags=["--"],
safe_flags=["-b", "-B"], # Creating branch is safe
reason="Discards uncommitted changes"
),
DangerRule(
executable="git",
subcommand="reset",
dangerous_flags=["--hard"],
safe_flags=[],
reason="Destroys uncommitted work"
),
DangerRule(
executable="git",
subcommand="push",
dangerous_flags=["--force", "-f"],
safe_flags=["--force-with-lease"], # Safer variant
reason="Rewrites remote history"
),
DangerRule(
executable="rm",
subcommand=None,
dangerous_flags=["-rf", "-r", "-f"],
safe_flags=[],
reason="Permanent deletion"
),
]
class SemanticAnalyzer:
"""Analyze commands semantically."""
def __init__(self, rules: list[DangerRule]):
self.rules = rules
self.parser = CommandParser()
def analyze(self, command: str) -> tuple[bool, str | None]:
"""Returns (is_safe, reason if unsafe)."""
parsed = self.parser.parse(command)
for rule in self.rules:
if parsed.executable != rule.executable:
continue
if rule.subcommand and parsed.subcommand != rule.subcommand:
continue
# Check for safe flags first
if any(sf in parsed.flags for sf in rule.safe_flags):
continue
# Check for dangerous flags
if any(df in parsed.flags for df in rule.dangerous_flags):
return False, rule.reason
return True, NoneEffort: Medium
Verdict: YES - Much safer than regex matching.
What it is: Recursively unwrap bash -c, sh -c, python -c to analyze hidden commands.
Current BLACKICE approach: Analyze surface command only.
Why adopt: Agents can hide dangerous commands in shell wrappers.
Implementation sketch:
class ShellUnwrapper:
"""Recursively unwrap shell commands."""
SHELL_WRAPPERS = {
"bash": ["-c"],
"sh": ["-c"],
"zsh": ["-c"],
"python": ["-c"],
"python3": ["-c"],
"node": ["-e"],
"perl": ["-e"],
}
def unwrap(self, command: str) -> list[str]:
"""Extract all nested commands."""
commands = [command]
parts = shlex.split(command)
if len(parts) < 2:
return commands
executable = parts[0]
if executable not in self.SHELL_WRAPPERS:
return commands
# Check for shell execution flags
for i, part in enumerate(parts[1:], 1):
if part in self.SHELL_WRAPPERS[executable]:
# Next part is the command
if i + 1 < len(parts):
inner_command = parts[i + 1]
# Recursively unwrap
commands.extend(self.unwrap(inner_command))
break
return commands
def analyze_all(self, command: str, analyzer: SemanticAnalyzer) -> tuple[bool, str | None]:
"""Analyze command and all nested commands."""
all_commands = self.unwrap(command)
for cmd in all_commands:
is_safe, reason = analyzer.analyze(cmd)
if not is_safe:
return False, f"Nested command '{cmd}': {reason}"
return True, None
# Example
unwrapper = ShellUnwrapper()
# This will detect the dangerous rm inside bash -c
command = 'bash -c "rm -rf /important/data"'
commands = unwrapper.unwrap(command)
# Returns: ['bash -c "rm -rf /important/data"', 'rm -rf /important/data']Effort: Low-Medium
Verdict: YES - Critical for security.
What it is: User-level defaults + project-level overrides.
Current BLACKICE approach: Global config only.
Why adopt: Different projects have different safety needs.
Implementation sketch:
from pathlib import Path
@dataclass
class SafetyConfig:
blocked_commands: list[DangerRule]
allowed_paths: list[str] # Safe to delete in these paths
strict_mode: bool
paranoid_mode: bool
class ConfigLoader:
"""Load safety config from multiple scopes."""
USER_CONFIG = Path("~/.cc-safety-net/config.json").expanduser()
PROJECT_CONFIG = Path(".safety-net.json")
def load(self) -> SafetyConfig:
"""Load and merge configs (project wins on conflicts)."""
user_config = self._load_file(self.USER_CONFIG)
project_config = self._load_file(self.PROJECT_CONFIG)
return self._merge(user_config, project_config)
def _load_file(self, path: Path) -> dict:
if not path.exists():
return {}
try:
return json.loads(path.read_text())
except json.JSONDecodeError:
# Fail safe: return empty, don't crash
return {}
def _merge(self, user: dict, project: dict) -> SafetyConfig:
"""Project config overrides user config."""
merged = {**user, **project}
# Special handling: blocked_commands are additive
blocked = user.get("blocked_commands", []) + project.get("blocked_commands", [])
merged["blocked_commands"] = self._dedupe_rules(blocked)
return SafetyConfig(**merged)
# Project-specific config example
# .safety-net.json
{
"allowed_paths": ["/tmp", "./build", "./dist"],
"blocked_commands": [
{
"executable": "docker",
"subcommand": "system prune",
"reason": "Don't clean Docker in this project"
}
],
"strict_mode": true
}Effort: Low
Verdict: YES - Flexible, safe defaults.
What it is: If config is malformed, fall back to built-in protections.
Current BLACKICE approach: Crash on bad config.
Why adopt: Safety should never be compromised by config errors.
Implementation sketch:
DEFAULT_BLOCKED = [
DangerRule("rm", None, ["-rf"], [], "Permanent deletion"),
DangerRule("git", "push", ["--force"], ["--force-with-lease"], "Force push"),
DangerRule("git", "reset", ["--hard"], [], "Hard reset"),
DangerRule("git", "clean", ["-f"], [], "Clean untracked"),
DangerRule("chmod", None, ["777"], [], "Insecure permissions"),
DangerRule("curl", None, ["|", "bash"], [], "Pipe to shell"),
]
class SafetyGuard:
"""Guard with fail-safe defaults."""
def __init__(self, config_path: Path = None):
self.config = self._load_config_safely(config_path)
def _load_config_safely(self, path: Path) -> SafetyConfig:
"""Load config, fall back to defaults on any error."""
try:
if path and path.exists():
data = json.loads(path.read_text())
return SafetyConfig(**data)
except Exception as e:
# Log but don't crash
logger.warning(f"Config error, using defaults: {e}")
# Return safe defaults
return SafetyConfig(
blocked_commands=DEFAULT_BLOCKED,
allowed_paths=["/tmp", "/var/tmp"],
strict_mode=False,
paranoid_mode=False
)
def check(self, command: str) -> tuple[bool, str | None]:
"""Check if command is safe. Always returns valid result."""
try:
return self._analyze(command)
except Exception as e:
# On any analysis error, block the command
logger.error(f"Analysis error, blocking: {e}")
return False, "Analysis failed - blocked for safety"Effort: Low
Verdict: YES - Defense in depth.
What it is: Extra-strict mode that blocks even slightly risky operations.
Current BLACKICE approach: Single strictness level.
Why adopt: High-security environments need extra protection.
Implementation sketch:
class ParanoidGuard(SafetyGuard):
"""Extra-strict safety guard."""
PARANOID_RULES = [
# Block ALL interpreter one-liners
DangerRule("python", None, ["-c"], [], "Interpreter execution"),
DangerRule("node", None, ["-e"], [], "Interpreter execution"),
DangerRule("perl", None, ["-e"], [], "Interpreter execution"),
# Block network operations
DangerRule("curl", None, [], [], "Network fetch"),
DangerRule("wget", None, [], [], "Network fetch"),
# Block any rm (not just -rf)
DangerRule("rm", None, [], [], "Any deletion"),
# Block sudo entirely
DangerRule("sudo", None, [], [], "Elevated privileges"),
]
def __init__(self, config_path: Path = None):
super().__init__(config_path)
if self.config.paranoid_mode:
self.config.blocked_commands.extend(self.PARANOID_RULES)
# Usage
guard = ParanoidGuard() # Enable with config: {"paranoid_mode": true}Effort: Low
Verdict: YES - Options for high-security environments.
Why skip: BLACKICE should remain model-agnostic.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Semantic Command Analysis | YES | Medium | High |
| Shell Wrapper Detection | YES | Low | High |
| Fail-Safe Defaults | YES | Low | High |
| Dual-Scope Configuration | YES | Low | Medium |
| Paranoid Mode | YES | Low | Low |
<!-- Source Gist 11 of 19: bdf398007302c18632c1784c8e092ac3 -->
Claude-Workflow-v2 Ideas for BLACKICE
Ideas from Claude Workflow v2 for BLACKICE.
A comprehensive Claude Code plugin with 7 specialized agents, 17 commands, 6 skills, and 9 hooks for intelligent software development workflows.
| Aspect | Claude Workflow v2 | BLACKICE |
|---|---|---|
| Focus | Agent orchestration via plugins | Iterate-until-success |
| Agents | 7 specialized (reviewer, debugger, etc.) | Supervisor + Workers |
| Config | Markdown files | Python/YAML |
| Hooks | 9 types (security, formatting, etc.) | Limited hooks |
- 7 Specialized Agents - Orchestrator, code-reviewer, debugger, docs-writer, security-auditor, refactorer, test-architect
- Proactive Agent Spawning - Context-triggered activation
- Multi-Step Commands -
/commit-push-prchains operations - Skill-Based Knowledge - External files provide domain guidance
- Cascading Verification - Parallel sub-agent validation
What it is: Agents activate based on context, not explicit commands.
Current BLACKICE approach: Explicit agent selection.
Why adopt: Friction-free workflows. Agent selection becomes automatic.
Implementation sketch:
@dataclass
class ActivationTrigger:
agent_type: str
patterns: list[str] # Regex patterns
keywords: list[str]
context_requirements: list[str] # e.g., "has_test_files"
ACTIVATION_TRIGGERS = [
ActivationTrigger(
agent_type="code_reviewer",
patterns=[r"review\s+(?:this|the|my)\s+code", r"check\s+for\s+issues"],
keywords=["review", "audit", "check quality"],
context_requirements=[]
),
ActivationTrigger(
agent_type="security_auditor",
patterns=[r"security\s+(?:check|audit|scan)", r"vulnerabilit"],
keywords=["security", "vulnerability", "cve"],
context_requirements=[]
),
ActivationTrigger(
agent_type="test_architect",
patterns=[r"write\s+tests?", r"add\s+(?:unit\s+)?tests?"],
keywords=["test", "coverage", "tdd"],
context_requirements=["has_source_files"]
),
ActivationTrigger(
agent_type="refactorer",
patterns=[r"refactor", r"clean\s*up", r"restructure"],
keywords=["refactor", "cleanup", "improve"],
context_requirements=["has_source_files"]
),
]
class ProactiveSpawner:
"""Spawn agents based on context."""
def __init__(self, triggers: list[ActivationTrigger]):
self.triggers = triggers
def detect_agent(self, user_message: str, context: dict) -> str | None:
"""Detect which agent should handle this request."""
message_lower = user_message.lower()
for trigger in self.triggers:
# Check keywords
if any(kw in message_lower for kw in trigger.keywords):
if self._check_context(trigger, context):
return trigger.agent_type
# Check patterns
for pattern in trigger.patterns:
if re.search(pattern, message_lower):
if self._check_context(trigger, context):
return trigger.agent_type
return None # Use default agent
def _check_context(self, trigger: ActivationTrigger, context: dict) -> bool:
for req in trigger.context_requirements:
if not context.get(req, False):
return False
return True
# Usage
spawner = ProactiveSpawner(ACTIVATION_TRIGGERS)
agent_type = spawner.detect_agent(user_message, {"has_source_files": True})
if agent_type:
agent = spawn_agent(agent_type)Effort: Low-Medium
Verdict: YES - Better UX than manual selection.
What it is: Single command triggers multiple sequential operations.
Current BLACKICE approach: Individual commands.
Why adopt: Common workflows in one command. Less friction.
Implementation sketch:
@dataclass
class CommandStep:
name: str
command: str
args: dict
on_failure: Literal["abort", "continue", "skip"]
@dataclass
class CommandChain:
name: str
description: str
steps: list[CommandStep]
COMMAND_CHAINS = {
"commit-push-pr": CommandChain(
name="commit-push-pr",
description="Stage, commit, push, and create PR",
steps=[
CommandStep("stage", "git add", {"files": "."}, "abort"),
CommandStep("commit", "git commit", {"message": "{message}"}, "abort"),
CommandStep("push", "git push", {"branch": "{branch}"}, "abort"),
CommandStep("pr", "gh pr create", {"title": "{title}"}, "continue"),
]
),
"test-fix-commit": CommandChain(
name="test-fix-commit",
description="Run tests, fix failures, commit fixes",
steps=[
CommandStep("test", "pytest", {}, "continue"),
CommandStep("fix", "agent:fix_failures", {}, "abort"),
CommandStep("retest", "pytest", {}, "abort"),
CommandStep("commit", "git commit", {"message": "fix: test failures"}, "continue"),
]
),
"review-merge": CommandChain(
name="review-merge",
description="Review PR and merge if approved",
steps=[
CommandStep("checkout", "git checkout", {"pr": "{pr_number}"}, "abort"),
CommandStep("review", "agent:code_review", {}, "abort"),
CommandStep("approve", "gh pr review --approve", {}, "abort"),
CommandStep("merge", "gh pr merge", {}, "abort"),
]
),
}
class ChainExecutor:
"""Execute command chains."""
async def execute(self, chain_name: str, params: dict) -> ChainResult:
chain = COMMAND_CHAINS[chain_name]
results = []
for step in chain.steps:
# Substitute parameters
args = {k: v.format(**params) if isinstance(v, str) else v
for k, v in step.args.items()}
try:
if step.command.startswith("agent:"):
result = await self._run_agent(step.command[6:], args)
else:
result = await self._run_command(step.command, args)
results.append((step.name, "success", result))
except Exception as e:
results.append((step.name, "failed", str(e)))
match step.on_failure:
case "abort":
return ChainResult(status="aborted", step=step.name, results=results)
case "skip":
continue
case "continue":
continue
return ChainResult(status="success", results=results)Effort: Low
Verdict: YES - Workflow efficiency.
What it is: Spawn parallel sub-agents for verification (build, test, lint, security).
Current BLACKICE approach: Sequential verification.
Why adopt: Faster verification. Independent failure detection.
Implementation sketch:
@dataclass
class Verifier:
name: str
agent_type: str
timeout: float
critical: bool # If True, failure blocks merge
VERIFIERS = [
Verifier("build", "build_validator", 300, True),
Verifier("test", "test_runner", 600, True),
Verifier("lint", "lint_checker", 60, False),
Verifier("security", "security_scanner", 120, True),
Verifier("types", "type_checker", 120, False),
]
class CascadingVerifier:
"""Run multiple verifiers in parallel."""
async def verify_all(self, changes: list[Path]) -> VerificationReport:
"""Run all verifiers in parallel."""
tasks = [
self._run_verifier(v, changes)
for v in VERIFIERS
]
results = await asyncio.gather(*tasks, return_exceptions=True)
report = VerificationReport()
for verifier, result in zip(VERIFIERS, results):
if isinstance(result, Exception):
report.add_failure(verifier.name, str(result), verifier.critical)
elif not result.passed:
report.add_failure(verifier.name, result.message, verifier.critical)
else:
report.add_success(verifier.name, result.message)
return report
async def _run_verifier(self, verifier: Verifier, changes: list[Path]):
"""Run single verifier with timeout."""
agent = spawn_agent(verifier.agent_type)
try:
return await asyncio.wait_for(
agent.verify(changes),
timeout=verifier.timeout
)
except asyncio.TimeoutError:
return VerifierResult(passed=False, message=f"Timeout after {verifier.timeout}s")
@dataclass
class VerificationReport:
successes: list[tuple[str, str]] = field(default_factory=list)
failures: list[tuple[str, str, bool]] = field(default_factory=list)
@property
def can_proceed(self) -> bool:
"""True if no critical failures."""
return not any(critical for _, _, critical in self.failures)
def format_summary(self) -> str:
lines = ["## Verification Report", ""]
for name, msg in self.successes:
lines.append(f"✅ {name}: {msg}")
for name, msg, critical in self.failures:
marker = "❌" if critical else "⚠️"
lines.append(f"{marker} {name}: {msg}")
return "\n".join(lines)Effort: Medium
Verdict: YES - Parallel verification is faster.
What it is: Commands declare which tools they're allowed to use.
Current BLACKICE approach: All-or-nothing access.
Why adopt: Principle of least privilege. Safer execution.
Implementation sketch:
@dataclass
class ToolPermission:
tool: str
allowed_patterns: list[str] # Allowed argument patterns
@dataclass
class CommandPermissions:
allowed_tools: list[ToolPermission]
denied_tools: list[str]
COMMAND_PERMISSIONS = {
"code_review": CommandPermissions(
allowed_tools=[
ToolPermission("Read", ["*"]),
ToolPermission("Grep", ["*"]),
ToolPermission("Bash", ["git diff*", "git log*", "git show*"]),
],
denied_tools=["Write", "Edit", "Bash:rm*", "Bash:git push*"]
),
"refactor": CommandPermissions(
allowed_tools=[
ToolPermission("Read", ["*"]),
ToolPermission("Write", ["*.py", "*.ts", "*.js"]),
ToolPermission("Edit", ["*.py", "*.ts", "*.js"]),
ToolPermission("Bash", ["git diff*", "pytest*", "npm test*"]),
],
denied_tools=["Bash:rm -rf*", "Bash:git push*"]
),
}
class PermissionEnforcer:
def check(self, command: str, tool: str, args: dict) -> bool:
perms = COMMAND_PERMISSIONS.get(command)
if not perms:
return True # No restrictions
# Check denied first
for denied in perms.denied_tools:
if self._matches(tool, args, denied):
return False
# Check allowed
for allowed in perms.allowed_tools:
if allowed.tool == tool:
if self._args_match(args, allowed.allowed_patterns):
return True
return False # Not in allowed listEffort: Low
Verdict: YES - Security best practice.
Why skip: BLACKICE uses Python/YAML which is more powerful.
Why skip: BLACKICE should remain model-agnostic.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Proactive Agent Spawning | YES | Medium | High |
| Multi-Step Command Chains | YES | Low | High |
| Cascading Verification | YES | Medium | Medium |
| Tool Permission Scoping | YES | Low | Medium |
<!-- Source Gist 12 of 19: 8c529b7bfea515a8a09db9ed5de4327c -->
Acontext Ideas for BLACKICE
Ideas from Acontext for BLACKICE.
A context data platform for storing, observing, and optimizing AI agent performance with unified storage and self-learning capabilities.
| Aspect | Acontext | BLACKICE |
|---|---|---|
| Focus | Context storage & learning | Iterate-until-success |
| Storage | PostgreSQL + Redis + S3 | SQLite (Beads) |
| Learning | Experience agent + SOPs | Reflexion |
| API | FastAPI | CLI |
- Unified Message Storage - Multi-provider LLM message persistence
- Background Task Extraction - Automatic TODO detection from conversations
- Experience Agent - Learns from successful completions
- SOP Generation - Creates reusable procedures from patterns
- Artifact Storage - S3-backed file management
What it is: Automatically extract TODOs and action items from agent conversations.
Current BLACKICE approach: Manual task tracking.
Why adopt: Don't lose tasks mentioned in conversation. Automatic backlog population.
Implementation sketch:
import re
@dataclass
class ExtractedTask:
description: str
source: str # Which message it came from
priority: Literal["high", "medium", "low"]
due: str | None
class TaskExtractor:
"""Extract tasks from agent conversations."""
TODO_PATTERNS = [
r"TODO:\s*(.+)",
r"FIXME:\s*(.+)",
r"(?:need to|should|must|have to)\s+(.+?)(?:\.|$)",
r"(?:later|next|afterwards?),?\s+(.+?)(?:\.|$)",
r"don't forget to\s+(.+?)(?:\.|$)",
]
PRIORITY_KEYWORDS = {
"high": ["urgent", "critical", "asap", "immediately", "blocking"],
"medium": ["soon", "important", "should"],
"low": ["eventually", "nice to have", "when possible"],
}
async def extract_from_conversation(self, messages: list[Message]) -> list[ExtractedTask]:
"""Extract all tasks from conversation history."""
tasks = []
for msg in messages:
content = msg.content.lower()
for pattern in self.TODO_PATTERNS:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
tasks.append(ExtractedTask(
description=match.strip(),
source=msg.id,
priority=self._detect_priority(match),
due=self._detect_due_date(match)
))
# Deduplicate similar tasks
return self._deduplicate(tasks)
def _detect_priority(self, text: str) -> str:
text_lower = text.lower()
for priority, keywords in self.PRIORITY_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
return priority
return "medium"
async def monitor_and_extract(self, beads: BeadsClient):
"""Background task that monitors for new tasks."""
async for event in beads.subscribe("message_added"):
tasks = await self.extract_from_conversation([event.message])
for task in tasks:
await beads.append_event("task_extracted", task.__dict__)Effort: Medium
Verdict: YES - Automatic task discovery is valuable.
What it is: When agent successfully completes a task type multiple times, generate reusable SOP.
Current BLACKICE approach: Reflexion learns but doesn't formalize.
Why adopt: Turn implicit learning into explicit, shareable procedures.
Implementation sketch:
@dataclass
class SOP:
id: str
task_type: str
title: str
steps: list[str]
prerequisites: list[str]
success_criteria: list[str]
source_tasks: list[str] # Tasks that contributed to this SOP
confidence: float
class SOPGenerator:
"""Generate SOPs from successful task patterns."""
def __init__(self, beads: BeadsClient, llm: LLMAdapter):
self.beads = beads
self.llm = llm
async def find_candidates(self, min_successes: int = 3) -> list[str]:
"""Find task types with enough successes to generate SOP."""
query = """
SELECT task_type, COUNT(*) as success_count
FROM tasks
WHERE status = 'success'
GROUP BY task_type
HAVING COUNT(*) >= ?
"""
return await self.beads.query(query, (min_successes,))
async def generate_sop(self, task_type: str) -> SOP:
"""Generate SOP from successful task executions."""
# Get successful task traces
traces = await self.beads.get_traces(
task_type=task_type,
status="success",
limit=10
)
# Extract common patterns using LLM
prompt = f"""
Analyze these successful task executions and extract a reusable Standard Operating Procedure.
Task type: {task_type}
Successful executions:
{json.dumps([t.summary for t in traces], indent=2)}
Generate an SOP with:
1. Prerequisites (what must be true before starting)
2. Steps (ordered actions to take)
3. Success criteria (how to know it's done)
Format as JSON matching this schema:
{{
"title": "string",
"prerequisites": ["string"],
"steps": ["string"],
"success_criteria": ["string"]
}}
"""
response = await self.llm.generate(prompt)
sop_data = json.loads(response)
return SOP(
id=f"sop-{task_type}-{uuid4().hex[:8]}",
task_type=task_type,
title=sop_data["title"],
steps=sop_data["steps"],
prerequisites=sop_data["prerequisites"],
success_criteria=sop_data["success_criteria"],
source_tasks=[t.id for t in traces],
confidence=len(traces) / 10 # More sources = higher confidence
)
async def apply_sop(self, task: Task) -> str:
"""Inject relevant SOP into task prompt."""
sop = await self.beads.get_sop(task.task_type)
if not sop:
return task.description
return f"""
## Standard Operating Procedure: {sop.title}
### Prerequisites
{chr(10).join(f"- {p}" for p in sop.prerequisites)}
### Recommended Steps
{chr(10).join(f"{i+1}. {s}" for i, s in enumerate(sop.steps))}
### Success Criteria
{chr(10).join(f"- {c}" for c in sop.success_criteria)}
---
## Your Task
{task.description}
Follow the SOP above unless the task requires deviation.
"""Effort: Medium
Verdict: YES - Formalized learning is powerful.
What it is: Manage generated outputs through file paths, not inline content.
Current BLACKICE approach: Code in Beads events.
Why adopt: Large artifacts bloat context. File references are lightweight.
Implementation sketch:
@dataclass
class Artifact:
id: str
task_id: str
type: Literal["code", "config", "docs", "test", "other"]
path: Path
size_bytes: int
created_at: datetime
metadata: dict
class ArtifactStore:
"""Store and retrieve task artifacts."""
def __init__(self, base_path: Path, s3_client=None):
self.base_path = base_path
self.s3 = s3_client # Optional cloud backup
async def save(self, task_id: str, content: str, artifact_type: str, filename: str) -> Artifact:
"""Save artifact and return reference."""
artifact_dir = self.base_path / task_id
artifact_dir.mkdir(parents=True, exist_ok=True)
path = artifact_dir / filename
path.write_text(content)
artifact = Artifact(
id=str(uuid4()),
task_id=task_id,
type=artifact_type,
path=path,
size_bytes=len(content.encode()),
created_at=datetime.now(),
metadata={"original_filename": filename}
)
# Optional: backup to S3
if self.s3:
await self.s3.upload(str(path), f"artifacts/{task_id}/{filename}")
return artifact
async def get_summary(self, task_id: str) -> str:
"""Get lightweight summary of artifacts (not full content)."""
artifacts = await self.list(task_id)
lines = ["## Generated Artifacts", ""]
for a in artifacts:
lines.append(f"- `{a.path.name}` ({a.type}, {a.size_bytes} bytes)")
return "\n".join(lines)
def get_reference_for_prompt(self, artifact: Artifact) -> str:
"""Get artifact reference for agent prompt (not full content)."""
return f"[Artifact: {artifact.path.name}] - Use `read_file` tool to access"Effort: Low
Verdict: YES - Keep context lean.
What it is: Background agent monitors session health without explicit config.
Current BLACKICE approach: Manual monitoring.
Why adopt: Automatic detection of stuck sessions, runaway costs, etc.
Implementation sketch:
@dataclass
class SessionHealth:
session_id: str
status: Literal["healthy", "warning", "critical"]
issues: list[str]
metrics: dict
class SessionMonitor:
"""Monitor session health in background."""
HEALTH_CHECKS = [
("iteration_stuck", lambda s: s.current_iteration == s.last_iteration and s.idle_time > 60),
("cost_warning", lambda s: s.token_cost > s.budget * 0.8),
("cost_critical", lambda s: s.token_cost > s.budget),
("loop_detected", lambda s: s.repeated_outputs > 3),
("error_rate_high", lambda s: s.error_count / max(s.iteration_count, 1) > 0.5),
]
async def check(self, session: Session) -> SessionHealth:
issues = []
for check_name, check_fn in self.HEALTH_CHECKS:
try:
if check_fn(session):
issues.append(check_name)
except Exception:
pass
status = "healthy"
if any("critical" in i for i in issues):
status = "critical"
elif issues:
status = "warning"
return SessionHealth(
session_id=session.id,
status=status,
issues=issues,
metrics={
"iterations": session.iteration_count,
"tokens": session.token_count,
"errors": session.error_count,
"idle_seconds": session.idle_time,
}
)
async def monitor_loop(self, beads: BeadsClient):
"""Background monitoring loop."""
while True:
active_sessions = await beads.get_active_sessions()
for session in active_sessions:
health = await self.check(session)
if health.status != "healthy":
await self._alert(health)
await asyncio.sleep(10)Effort: Medium
Verdict: YES - Proactive health monitoring.
Why skip: BLACKICE's SQLite (Beads) is simpler and sufficient.
Why skip: BLACKICE already has adapter pattern.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Background Task Extraction | YES | Medium | High |
| SOP Generation | YES | Medium | Medium |
| Artifact-Centric Storage | YES | Low | Medium |
| Session Health Monitoring | YES | Medium | Medium |
<!-- Source Gist 13 of 19: 752c3748a1282907105c8e2e233393d2 -->
Planning-with-Files Ideas for BLACKICE
Ideas from Planning with Files for BLACKICE.
A Claude Code skill implementing persistent markdown-based planning. Uses filesystem as memory to prevent goal drift.
| Aspect | Planning with Files | BLACKICE |
|---|---|---|
| Focus | Persistent task state | Iterate-until-success |
| Memory | Markdown files | Beads event store |
| Pattern | 3-file system | Event replay |
| Inspiration | Manus agent | Ralph Loop |
- Filesystem as Memory - Files persist state, not context window
- 3-File Pattern -
task_plan.md,notes.md, deliverable - Attention Recovery - Re-read plan before decisions
- Append-Only Notes - Never modify historical entries
- Goal Tracking - Checkbox-based progress visibility
What it is: Re-read objectives before every major decision.
Current BLACKICE approach: Hope agent remembers goals.
Why adopt: Prevents drift over long sessions. Manus uses this for 50+ tool calls.
Implementation sketch:
class AttentionManager:
"""Force agent to re-read objectives periodically."""
def __init__(self, task: Task, interval: int = 5):
self.task = task
self.interval = interval # Re-read every N tool calls
self.tool_call_count = 0
self.plan_path = Path(f".agent/{task.id}/task_plan.md")
def before_tool_call(self, tool: str, args: dict) -> str | None:
"""Check if we need attention recovery."""
self.tool_call_count += 1
if self.tool_call_count % self.interval == 0:
return self._get_attention_prompt()
return None
def _get_attention_prompt(self) -> str:
plan = self.plan_path.read_text() if self.plan_path.exists() else ""
return f"""
⚠️ ATTENTION CHECK (call #{self.tool_call_count})
Before proceeding, re-read your objectives:
{plan}
Current phase: {self._get_current_phase(plan)}
Remaining tasks: {self._count_remaining(plan)}
Continue with your next action, keeping these objectives in mind.
"""
def _get_current_phase(self, plan: str) -> str:
# Find first unchecked phase
for line in plan.split("\n"):
if line.startswith("- [ ]"):
return line.replace("- [ ]", "").strip()
return "All phases complete"
def _count_remaining(self, plan: str) -> int:
return plan.count("- [ ]")
# Usage in execution loop
attention = AttentionManager(task)
for tool_call in agent.tool_calls:
attention_prompt = attention.before_tool_call(tool_call.tool, tool_call.args)
if attention_prompt:
await agent.inject_context(attention_prompt)
await execute_tool(tool_call)Effort: Low
Verdict: YES - Simple, effective drift prevention.
What it is: Separate concerns into plan, notes, and output files.
Current BLACKICE approach: Everything in Beads events.
Why adopt: Human-readable state. Easy debugging. Agent can re-read naturally.
Implementation sketch:
@dataclass
class TaskWorkspace:
"""3-file workspace for task state."""
task_id: str
base_path: Path = Path(".agent")
@property
def plan_path(self) -> Path:
return self.base_path / self.task_id / "task_plan.md"
@property
def notes_path(self) -> Path:
return self.base_path / self.task_id / "notes.md"
@property
def output_path(self) -> Path:
return self.base_path / self.task_id / "output.md"
def init(self, task: Task):
"""Initialize workspace with plan template."""
self.plan_path.parent.mkdir(parents=True, exist_ok=True)
plan_template = f"""# Task Plan: {task.name}
## Objective
{task.description}
## Phases
- [ ] Phase 1: Research and understand requirements
- [ ] Phase 2: Design solution approach
- [ ] Phase 3: Implement solution
- [ ] Phase 4: Test and validate
- [ ] Phase 5: Document and deliver
## Success Criteria
{task.success_criteria or "Task completed successfully"}
## Progress Log
<!-- Updated by agent after each phase -->
"""
self.plan_path.write_text(plan_template)
self.notes_path.write_text("# Research Notes\n\n")
def append_note(self, note: str):
"""Append to notes (never modify existing)."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
entry = f"\n## {timestamp}\n{note}\n"
with self.notes_path.open("a") as f:
f.write(entry)
def update_progress(self, phase: int, status: str):
"""Update phase checkbox in plan."""
plan = self.plan_path.read_text()
# Replace checkbox for phase
old = f"- [ ] Phase {phase}:"
new = f"- [x] Phase {phase}: ✅ {status}"
plan = plan.replace(old, new)
self.plan_path.write_text(plan)
def get_context_for_agent(self) -> str:
"""Get full context for agent."""
plan = self.plan_path.read_text() if self.plan_path.exists() else ""
notes = self.notes_path.read_text() if self.notes_path.exists() else ""
return f"""
## Current Task Plan
{plan}
## Research Notes (read for context)
{notes}
---
Continue from where you left off. Update the plan as you make progress.
"""Effort: Low
Verdict: YES - Simple, debuggable state management.
What it is: Only append to notes, never modify history.
Current BLACKICE approach: Event store is append-only.
Why adopt: Audit trail. No lost information. Easy to follow timeline.
Implementation sketch:
class AppendOnlyLog:
"""Append-only log with structured entries."""
def __init__(self, path: Path):
self.path = path
def append(self, entry_type: str, content: str, metadata: dict = None):
"""Append entry with timestamp and type."""
timestamp = datetime.now().isoformat()
entry = {
"timestamp": timestamp,
"type": entry_type,
"content": content,
"metadata": metadata or {}
}
with self.path.open("a") as f:
f.write(f"\n---\n")
f.write(f"**[{timestamp}]** `{entry_type}`\n\n")
f.write(content)
f.write("\n")
def find_entries(self, entry_type: str) -> list[str]:
"""Find all entries of a type."""
text = self.path.read_text()
entries = []
for section in text.split("\n---\n"):
if f"`{entry_type}`" in section:
entries.append(section)
return entries
# Usage
log = AppendOnlyLog(Path(".agent/task-123/notes.md"))
log.append("discovery", "Found that the API requires auth token in header")
log.append("decision", "Will use OAuth2 client credentials flow")
log.append("blocker", "API rate limit hit, waiting 60 seconds")
log.append("resolution", "Implemented retry with exponential backoff")Effort: Low
Verdict: YES - Already have this pattern in Beads.
What it is: Only use structured planning for complex tasks (3+ steps).
Current BLACKICE approach: Same process for all tasks.
Why adopt: Don't over-engineer simple tasks. Save overhead for complex ones.
Implementation sketch:
class TaskComplexityDetector:
"""Detect if task needs structured planning."""
COMPLEXITY_INDICATORS = [
r"multiple\s+files?",
r"several\s+steps?",
r"refactor",
r"migrate",
r"integrate",
r"implement.*feature",
r"debug.*complex",
r"across.*modules?",
]
SIMPLE_INDICATORS = [
r"fix\s+typo",
r"update\s+version",
r"add\s+comment",
r"rename",
r"simple\s+change",
]
def needs_structured_planning(self, task: Task) -> bool:
"""Check if task needs full planning infrastructure."""
description = task.description.lower()
# Check for simple task indicators
for pattern in self.SIMPLE_INDICATORS:
if re.search(pattern, description):
return False
# Check for complexity indicators
complexity_score = 0
for pattern in self.COMPLEXITY_INDICATORS:
if re.search(pattern, description):
complexity_score += 1
return complexity_score >= 2
# Usage in flywheel
detector = TaskComplexityDetector()
if detector.needs_structured_planning(task):
workspace = TaskWorkspace(task.id)
workspace.init(task)
await run_with_planning(task, workspace)
else:
await run_simple(task)Effort: Low
Verdict: YES - Don't over-engineer simple tasks.
Why skip: BLACKICE should manage state automatically.
Why skip: Some patterns are specific to Manus's architecture.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Forced Attention Recovery | YES | Low | High |
| 3-File State Pattern | YES | Low | Medium |
| Append-Only Notes | YES | Low | Medium |
| Conditional Activation | YES | Low | Low |
<!-- Source Gist 14 of 19: 4de321819ca80dc51ca0d5f6ce0926db -->
Petit Ideas for BLACKICE
Ideas from Petit for BLACKICE.
A lightweight Rust task scheduler with DAG execution, designed for embedded/minimal environments.
| Aspect | Petit | BLACKICE |
|---|---|---|
| Focus | Task scheduling with dependencies | Iterate-until-success |
| Language | Rust | Python |
| Execution | DAG topological sort | Sequential + parallel |
| State | SQLite or in-memory | Beads event store |
- DAG Dependency Resolution - Topological sort for execution order
- Conditional Execution -
all_success,on_failure,all_done - Cron Scheduling - 6-field timezone-aware expressions
- Concurrency Limits - Max tasks/jobs to prevent exhaustion
- Pluggable Storage - SQLite or in-memory backends
What it is: Tasks specify when they should run based on dependency status.
Current BLACKICE approach: Tasks run when dependencies complete (success only).
Why adopt: Handle failure paths gracefully. Run cleanup on failure.
Implementation sketch:
from enum import Enum
class ExecutionCondition(Enum):
ALL_SUCCESS = "all_success" # Run only if all deps succeeded
ALL_DONE = "all_done" # Run when all deps done (success or fail)
ALL_FAILED = "all_failed" # Run only if all deps failed
ANY_SUCCESS = "any_success" # Run if any dep succeeded
ANY_FAILED = "any_failed" # Run if any dep failed
ALWAYS = "always" # Always run regardless
@dataclass
class TaskNode:
id: str
name: str
depends_on: list[str]
condition: ExecutionCondition = ExecutionCondition.ALL_SUCCESS
def should_run(self, dep_results: dict[str, TaskResult]) -> bool:
if not self.depends_on:
return True
dep_statuses = [dep_results[d].status for d in self.depends_on]
match self.condition:
case ExecutionCondition.ALL_SUCCESS:
return all(s == "success" for s in dep_statuses)
case ExecutionCondition.ALL_DONE:
return all(s in ("success", "failed") for s in dep_statuses)
case ExecutionCondition.ALL_FAILED:
return all(s == "failed" for s in dep_statuses)
case ExecutionCondition.ANY_SUCCESS:
return any(s == "success" for s in dep_statuses)
case ExecutionCondition.ANY_FAILED:
return any(s == "failed" for s in dep_statuses)
case ExecutionCondition.ALWAYS:
return True
# Example: Cleanup task runs on failure
cleanup_task = TaskNode(
id="cleanup",
name="Cleanup on failure",
depends_on=["deploy"],
condition=ExecutionCondition.ANY_FAILED
)
# Example: Notification runs always
notify_task = TaskNode(
id="notify",
name="Send completion notification",
depends_on=["deploy", "cleanup"],
condition=ExecutionCondition.ALL_DONE
)Effort: Low
Verdict: YES - Essential for robust workflows.
What it is: Max simultaneous tasks to prevent resource exhaustion.
Current BLACKICE approach: No explicit limits.
Why adopt: Don't overwhelm GPU, API rate limits, or memory.
Implementation sketch:
import asyncio
from dataclasses import dataclass
@dataclass
class ConcurrencyConfig:
max_total_tasks: int = 10 # Global limit
max_tasks_per_job: int = 5 # Per-workflow limit
max_tasks_per_model: dict[str, int] = None # Per-model limits
def __post_init__(self):
if self.max_tasks_per_model is None:
self.max_tasks_per_model = {
"claude-opus": 2, # Expensive, limit concurrency
"claude-sonnet": 5,
"ollama/qwen": 10, # Local, can run more
}
class ConcurrencyLimiter:
def __init__(self, config: ConcurrencyConfig):
self.config = config
self._global_semaphore = asyncio.Semaphore(config.max_total_tasks)
self._model_semaphores: dict[str, asyncio.Semaphore] = {}
self._job_semaphores: dict[str, asyncio.Semaphore] = {}
def _get_model_semaphore(self, model: str) -> asyncio.Semaphore:
if model not in self._model_semaphores:
limit = self.config.max_tasks_per_model.get(model, 5)
self._model_semaphores[model] = asyncio.Semaphore(limit)
return self._model_semaphores[model]
def _get_job_semaphore(self, job_id: str) -> asyncio.Semaphore:
if job_id not in self._job_semaphores:
self._job_semaphores[job_id] = asyncio.Semaphore(self.config.max_tasks_per_job)
return self._job_semaphores[job_id]
async def acquire(self, task: Task):
"""Acquire all required semaphores."""
await self._global_semaphore.acquire()
await self._get_model_semaphore(task.model).acquire()
await self._get_job_semaphore(task.job_id).acquire()
def release(self, task: Task):
"""Release all semaphores."""
self._get_job_semaphore(task.job_id).release()
self._get_model_semaphore(task.model).release()
self._global_semaphore.release()
async def run_with_limits(self, task: Task, executor: Callable):
"""Execute task within concurrency limits."""
await self.acquire(task)
try:
return await executor(task)
finally:
self.release(task)Effort: Low
Verdict: YES - Essential for production.
What it is: Tasks in one workflow can depend on tasks in another.
Current BLACKICE approach: Dependencies only within a task.
Why adopt: Complex projects need cross-workflow coordination.
Implementation sketch:
@dataclass
class TaskRef:
job_id: str
task_id: str
def __str__(self):
return f"{self.job_id}:{self.task_id}"
@dataclass
class TaskNode:
id: str
job_id: str
depends_on: list[TaskRef] # Can reference other jobs
class CrossJobExecutor:
"""Execute tasks with cross-job dependencies."""
def __init__(self):
self.results: dict[str, TaskResult] = {} # "job:task" -> result
async def execute_task(self, task: TaskNode):
# Wait for all dependencies (even from other jobs)
for dep in task.depends_on:
dep_key = str(dep)
while dep_key not in self.results:
await asyncio.sleep(0.1)
# Check if should run based on dep results
dep_results = {str(d): self.results[str(d)] for d in task.depends_on}
if not task.should_run(dep_results):
self.results[f"{task.job_id}:{task.id}"] = TaskResult(status="skipped")
return
# Execute
result = await self._run(task)
self.results[f"{task.job_id}:{task.id}"] = result
# Example: Deploy job depends on build job
build_task = TaskNode(
id="compile",
job_id="build",
depends_on=[]
)
deploy_task = TaskNode(
id="deploy",
job_id="deploy",
depends_on=[TaskRef("build", "compile")] # Cross-job dependency
)Effort: Medium
Verdict: YES - Useful for complex workflows.
What it is: Choice between simple fixed-delay retries or exponential backoff.
Current BLACKICE approach: Exponential backoff only.
Why adopt: Some tasks benefit from fixed delay (e.g., waiting for external service).
Implementation sketch:
from enum import Enum
class RetryStrategy(Enum):
FIXED_DELAY = "fixed"
EXPONENTIAL = "exponential"
LINEAR = "linear"
@dataclass
class RetryConfig:
strategy: RetryStrategy
max_retries: int
base_delay: float # seconds
max_delay: float = 300 # cap for exponential
def get_delay(self, attempt: int) -> float:
match self.strategy:
case RetryStrategy.FIXED_DELAY:
return self.base_delay
case RetryStrategy.EXPONENTIAL:
delay = self.base_delay * (2 ** attempt)
return min(delay, self.max_delay)
case RetryStrategy.LINEAR:
delay = self.base_delay * (attempt + 1)
return min(delay, self.max_delay)
# Task-specific retry configs
RETRY_CONFIGS = {
"api_call": RetryConfig(RetryStrategy.EXPONENTIAL, max_retries=5, base_delay=1),
"file_wait": RetryConfig(RetryStrategy.FIXED_DELAY, max_retries=60, base_delay=1),
"build": RetryConfig(RetryStrategy.LINEAR, max_retries=3, base_delay=10),
}Effort: Low
Verdict: YES - Flexibility is good.
Why skip: BLACKICE is Python. Don't fragment the stack.
Why skip: BLACKICE is event-driven, not scheduled.
Why skip: BLACKICE targets GPUs, not embedded systems.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Conditional Execution | YES | Low | High |
| Concurrency Limits | YES | Low | High |
| Cross-Job Dependencies | YES | Medium | Medium |
| Flexible Retry Strategies | YES | Low | Low |
<!-- Source Gist 15 of 19: 5f4cb9ddbde4f88559f4bfb2df27d99f -->
Plannotator Ideas for BLACKICE
Ideas from Plannotator for BLACKICE.
A visual plan review system where humans annotate AI plans (delete, insert, replace) before approval.
| Aspect | Plannotator | BLACKICE |
|---|---|---|
| Focus | Visual plan annotation | Iterate-until-success |
| Interface | Browser UI | CLI |
| Feedback | Structured annotations | Success/failure |
| License | BSL 1.1 (restrictive) | MIT |
- Visual Plan Markup - Delete, insert, replace, comment operations
- Image Attachments - Drawing tools for UI mockups
- Structured Feedback Format - Machine-readable annotations
- Auto-Save - Export to Obsidian/Bear Notes
- Plugin Architecture - Works with Claude Code and OpenCode
What it is: Human annotations converted to machine-readable format.
Current BLACKICE approach: Unstructured user feedback.
Why adopt: Agents can parse and act on structured feedback precisely.
Implementation sketch:
from enum import Enum
class AnnotationType(Enum):
DELETE = "delete"
INSERT = "insert"
REPLACE = "replace"
COMMENT = "comment"
APPROVE = "approve"
REJECT = "reject"
@dataclass
class Annotation:
type: AnnotationType
target: str # What's being annotated
line_start: int | None
line_end: int | None
content: str | None # New content for insert/replace
comment: str | None # Human explanation
@dataclass
class AnnotatedPlan:
original_plan: str
annotations: list[Annotation]
overall_status: Literal["approved", "needs_changes", "rejected"]
summary: str
def apply_annotations(self) -> str:
"""Apply annotations to generate revised plan."""
lines = self.original_plan.split("\n")
# Sort by line number descending (apply from bottom up)
sorted_annotations = sorted(
[a for a in self.annotations if a.line_start],
key=lambda a: a.line_start,
reverse=True
)
for annotation in sorted_annotations:
match annotation.type:
case AnnotationType.DELETE:
del lines[annotation.line_start:annotation.line_end]
case AnnotationType.INSERT:
lines.insert(annotation.line_start, annotation.content)
case AnnotationType.REPLACE:
lines[annotation.line_start:annotation.line_end] = [annotation.content]
return "\n".join(lines)
def to_agent_prompt(self) -> str:
"""Convert annotations to prompt for agent."""
if self.overall_status == "approved":
return "Plan approved. Proceed with implementation."
feedback = ["Human feedback on your plan:", ""]
for a in self.annotations:
match a.type:
case AnnotationType.DELETE:
feedback.append(f"❌ DELETE lines {a.line_start}-{a.line_end}: {a.comment or 'Remove this'}")
case AnnotationType.INSERT:
feedback.append(f"➕ INSERT at line {a.line_start}: {a.content}")
case AnnotationType.REPLACE:
feedback.append(f"🔄 REPLACE lines {a.line_start}-{a.line_end} with: {a.content}")
case AnnotationType.COMMENT:
feedback.append(f"💬 COMMENT on lines {a.line_start}-{a.line_end}: {a.comment}")
feedback.append("")
feedback.append("Please revise your plan based on this feedback.")
return "\n".join(feedback)Effort: Medium
Verdict: YES - Clear feedback format.
What it is: Decouple plan generation from execution via browser review.
Current BLACKICE approach: Synchronous consensus voting.
Why adopt: Human review doesn't block agents. Review when convenient.
Implementation sketch:
import asyncio
from uuid import uuid4
@dataclass
class PendingReview:
id: str
plan: str
submitted_at: datetime
reviewed: bool = False
annotations: AnnotatedPlan | None = None
class AsyncReviewQueue:
"""Queue plans for async human review."""
def __init__(self):
self.pending: dict[str, PendingReview] = {}
self._review_events: dict[str, asyncio.Event] = {}
async def submit_for_review(self, plan: str, timeout: float = 3600) -> AnnotatedPlan:
"""Submit plan and wait for human review."""
review_id = str(uuid4())
self.pending[review_id] = PendingReview(
id=review_id,
plan=plan,
submitted_at=datetime.now()
)
self._review_events[review_id] = asyncio.Event()
# Notify human (webhook, email, desktop notification)
await self._notify_reviewer(review_id, plan)
# Wait for review (with timeout)
try:
await asyncio.wait_for(
self._review_events[review_id].wait(),
timeout=timeout
)
except asyncio.TimeoutError:
raise ReviewTimeoutError(f"Review {review_id} timed out")
return self.pending[review_id].annotations
async def complete_review(self, review_id: str, annotations: AnnotatedPlan):
"""Human completes review via API."""
if review_id not in self.pending:
raise ValueError(f"Unknown review: {review_id}")
self.pending[review_id].annotations = annotations
self.pending[review_id].reviewed = True
self._review_events[review_id].set()
# Web API for human review
@app.post("/api/reviews/{review_id}")
async def submit_review(review_id: str, annotations: AnnotatedPlan):
await review_queue.complete_review(review_id, annotations)
return {"status": "received"}Effort: Medium
Verdict: YES - Better UX for human review.
What it is: Web UI for plan manipulation with visual tools.
Current BLACKICE approach: CLI only.
Why adopt: Non-technical stakeholders can review AI plans visually.
Implementation sketch:
# Backend API for plan review UI
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
app = FastAPI()
@app.get("/api/plans/{plan_id}")
async def get_plan(plan_id: str):
"""Get plan for review."""
plan = await beads.get_plan(plan_id)
return {
"id": plan_id,
"content": plan.content,
"lines": plan.content.split("\n"),
"metadata": plan.metadata
}
@app.post("/api/plans/{plan_id}/annotations")
async def save_annotations(plan_id: str, annotations: list[Annotation]):
"""Save annotations from UI."""
await beads.save_annotations(plan_id, annotations)
return {"status": "saved"}
# Mount React/Vue UI
app.mount("/", StaticFiles(directory="ui/dist", html=True))Effort: High (requires frontend)
Verdict: MAYBE - Nice but CLI may be sufficient.
Why skip: Business Source License restricts commercial use. BLACKICE is MIT.
Why skip: Too niche. General file export is sufficient.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Structured Feedback Format | YES | Medium | High |
| Async Human-in-the-Loop | YES | Medium | Medium |
| Visual Plan Editing | MAYBE | High | Low |
<!-- Source Gist 16 of 19: 5d430f8cf367b9f1e02b660d7edae31f -->
Wayfound MCP Supervisor Ideas for BLACKICE
Ideas from Wayfound MCP Supervisor for BLACKICE.
AI supervision for agentic systems via Model Context Protocol. Agents query guidelines, receive feedback, and iterate until quality thresholds are met.
| Aspect | Wayfound | BLACKICE |
|---|---|---|
| Focus | Quality supervision & grading | Iterate-until-success |
| Integration | MCP (SSE) | Direct API calls |
| Feedback | Letter grades (A-F) | Success/failure |
| Learning | Historical session analysis | Beads + Reflexion |
- Pre-Execution Guidance - Query guidelines before starting work
- Pitfall Identification - Learn from common issues in past sessions
- Graded Evaluation - Letter grades with detailed breakdown
- Iterative Refinement - Loop until grade ≥ A-
- Session Transparency - Full breakdown of what passed/failed
What it is: Before starting work, agent queries for relevant guidelines.
Current BLACKICE approach: Guidelines baked into prompts.
Why adopt: Dynamic guidelines that evolve. Don't update prompts for every rule change.
Implementation sketch:
@dataclass
class Guideline:
id: str
category: str
rule: str
severity: Literal["must", "should", "may"]
examples: list[str]
class GuidelinesStore:
"""Store and retrieve coding guidelines."""
def __init__(self, db_path: Path):
self.db = sqlite3.connect(db_path)
def get_for_task(self, task_type: str, language: str) -> list[Guideline]:
"""Get relevant guidelines for task."""
query = """
SELECT * FROM guidelines
WHERE task_type = ? OR task_type = 'all'
AND (language = ? OR language = 'all')
ORDER BY severity DESC
"""
rows = self.db.execute(query, (task_type, language)).fetchall()
return [Guideline(**row) for row in rows]
def format_for_prompt(self, guidelines: list[Guideline]) -> str:
"""Format guidelines for agent prompt."""
sections = {"must": [], "should": [], "may": []}
for g in guidelines:
sections[g.severity].append(f"- {g.rule}")
return f"""
## Coding Guidelines
### MUST (Required)
{chr(10).join(sections['must'])}
### SHOULD (Recommended)
{chr(10).join(sections['should'])}
### MAY (Optional)
{chr(10).join(sections['may'])}
"""
# Usage before task execution
guidelines = store.get_for_task("code_review", "python")
guidelines_prompt = store.format_for_prompt(guidelines)
full_prompt = f"{guidelines_prompt}\n\n{task.description}"Effort: Low-Medium
Verdict: YES - Dynamic, maintainable guidelines.
What it is: Grade agent outputs A-F with clear thresholds.
Current BLACKICE approach: Binary success/failure.
Why adopt: Nuanced feedback. "C" is different from "F". Enables quality thresholds.
Implementation sketch:
from enum import Enum
class Grade(Enum):
A = 4.0
A_MINUS = 3.7
B_PLUS = 3.3
B = 3.0
B_MINUS = 2.7
C_PLUS = 2.3
C = 2.0
C_MINUS = 1.7
D = 1.0
F = 0.0
@dataclass
class Evaluation:
grade: Grade
breakdown: dict[str, float] # criterion -> score
feedback: str
passed: bool
@classmethod
def from_scores(cls, scores: dict[str, float], threshold: Grade = Grade.B) -> "Evaluation":
avg = sum(scores.values()) / len(scores)
grade = cls._score_to_grade(avg)
return cls(
grade=grade,
breakdown=scores,
feedback=cls._generate_feedback(scores),
passed=grade.value >= threshold.value
)
@staticmethod
def _score_to_grade(score: float) -> Grade:
if score >= 0.95: return Grade.A
if score >= 0.90: return Grade.A_MINUS
if score >= 0.85: return Grade.B_PLUS
if score >= 0.80: return Grade.B
if score >= 0.75: return Grade.B_MINUS
if score >= 0.70: return Grade.C_PLUS
if score >= 0.65: return Grade.C
if score >= 0.60: return Grade.C_MINUS
if score >= 0.50: return Grade.D
return Grade.F
class OutputEvaluator:
"""Evaluate agent outputs with letter grades."""
CRITERIA = [
"correctness", # Does it work?
"completeness", # Is it done?
"code_quality", # Is it clean?
"test_coverage", # Is it tested?
"documentation", # Is it documented?
]
async def evaluate(self, output: AgentOutput) -> Evaluation:
scores = {}
for criterion in self.CRITERIA:
scores[criterion] = await self._score_criterion(output, criterion)
return Evaluation.from_scores(scores)
async def _score_criterion(self, output: AgentOutput, criterion: str) -> float:
# Use another LLM to evaluate
prompt = f"Score this {criterion} from 0 to 1:\n{output.code}"
score_str = await self.evaluator_llm.run(prompt)
return float(score_str)Effort: Medium
Verdict: YES - Better than binary pass/fail.
What it is: Keep improving until output meets grade threshold.
Current BLACKICE approach: Iterate until success or max iterations.
Why adopt: "Success" is vague. Grade threshold is measurable.
Implementation sketch:
class QualityGatedLoop:
"""Iterate until quality threshold met."""
def __init__(
self,
evaluator: OutputEvaluator,
threshold: Grade = Grade.A_MINUS,
max_iterations: int = 5
):
self.evaluator = evaluator
self.threshold = threshold
self.max_iterations = max_iterations
async def run(self, agent: Agent, task: Task) -> tuple[AgentOutput, Evaluation]:
best_output = None
best_eval = None
for iteration in range(self.max_iterations):
# Generate output
output = await agent.run(task)
# Evaluate
evaluation = await self.evaluator.evaluate(output)
# Track best
if best_eval is None or evaluation.grade.value > best_eval.grade.value:
best_output = output
best_eval = evaluation
# Check threshold
if evaluation.passed:
return output, evaluation
# Generate improvement prompt
improvement_prompt = f"""
Your previous output received grade: {evaluation.grade.name}
Breakdown:
{json.dumps(evaluation.breakdown, indent=2)}
Feedback: {evaluation.feedback}
Please improve your output to achieve at least {self.threshold.name}.
Focus on the lowest-scoring criteria.
"""
task = Task(
description=f"{task.description}\n\n{improvement_prompt}",
id=task.id
)
# Return best even if threshold not met
return best_output, best_evalEffort: Medium
Verdict: YES - Quality-driven iteration is better.
What it is: Analyze past sessions to identify frequent issues.
Current BLACKICE approach: Reflexion learns from failures.
Why adopt: Proactive prevention. Show pitfalls BEFORE agent makes them.
Implementation sketch:
@dataclass
class Pitfall:
issue: str
frequency: float # Percentage of sessions with this issue
prevention: str # How to avoid it
example: str | None
class PitfallAnalyzer:
"""Analyze past sessions for common issues."""
def __init__(self, beads: BeadsClient):
self.beads = beads
async def analyze_history(self, task_type: str, limit: int = 100) -> list[Pitfall]:
"""Find common pitfalls from past sessions."""
# Get past sessions with failures
sessions = await self.beads.query(
event_type="task_failed",
task_type=task_type,
limit=limit
)
# Count issue types
issue_counts = Counter()
for session in sessions:
issues = self._extract_issues(session)
issue_counts.update(issues)
# Convert to pitfalls
total = len(sessions)
pitfalls = []
for issue, count in issue_counts.most_common(10):
pitfalls.append(Pitfall(
issue=issue,
frequency=count / total,
prevention=self._get_prevention(issue),
example=self._get_example(issue, sessions)
))
return pitfalls
def format_for_prompt(self, pitfalls: list[Pitfall]) -> str:
"""Format pitfalls as warning for agent."""
if not pitfalls:
return ""
lines = ["## Common Pitfalls to Avoid", ""]
for p in pitfalls:
lines.append(f"- **{p.issue}** ({p.frequency:.0%} of past attempts)")
lines.append(f" Prevention: {p.prevention}")
return "\n".join(lines)Effort: Medium
Verdict: YES - Learn from history proactively.
Why skip: BLACKICE should work offline. Don't require external service.
Why skip: Simpler to use direct function calls than SSE streaming.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Pre-Execution Guidelines | YES | Low | High |
| Letter Grade Evaluation | YES | Medium | High |
| Quality-Gated Iteration | YES | Medium | Medium |
| Common Pitfall Analysis | YES | Medium | Medium |
<!-- Source Gist 17 of 19: 4d1f6eee5b6f72d8b3f5f89c50a1eece -->
Ralph Orchestrator Ideas for BLACKICE
Ideas from Ralph Orchestrator for BLACKICE.
An autonomous AI agent loop that runs agents against a prompt file until task completion or limits are reached.
| Aspect | Ralph Orchestrator | BLACKICE |
|---|---|---|
| Focus | Iterate until complete | Iterate until success + consensus |
| Language | TypeScript/Python | Python |
| State | Git checkpoints + .agent/ workspace |
Beads event store |
| Agents | Claude, Q Chat, Gemini, ACP | Claude, Ollama, Letta |
| Tests | 920+ tests | 18K lines tests |
- Completion Marker Detection - Check for task completion, not assume success
- Git-Based Checkpointing - Async state preservation for recovery
- Adaptive Permission Framework - Graduated tool access control
- Agent Context Persistence - Scratchpad files maintain state across iterations
- Security Masking - Auto-mask API keys in logs
What it is: Explicitly check if agent marked task as complete, don't assume.
Current BLACKICE approach: Check for success/failure, but not explicit completion markers.
Why adopt: Agents should self-report completion status. Clearer than inferring from output.
Implementation sketch:
@dataclass
class CompletionMarker:
status: Literal["complete", "blocked", "in_progress", "failed"]
reason: str | None
next_steps: list[str] | None
class CompletionDetector:
"""Detect task completion from agent output."""
COMPLETION_PATTERNS = {
"complete": [
r"✅\s*TASK\s*COMPLETE",
r"\[DONE\]",
r"Task completed successfully",
],
"blocked": [
r"❌\s*BLOCKED",
r"\[BLOCKED\]",
r"Cannot proceed.*need",
],
"in_progress": [
r"🔄\s*IN\s*PROGRESS",
r"\[WIP\]",
r"Continuing with",
],
}
def detect(self, output: str) -> CompletionMarker:
for status, patterns in self.COMPLETION_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, output, re.IGNORECASE):
return CompletionMarker(status=status, reason=output[-500:], next_steps=None)
# Default to in_progress if no marker found
return CompletionMarker(status="in_progress", reason=None, next_steps=None)
def require_completion(self, agent: Agent, task: Task) -> str:
"""Force agent to include completion marker."""
prompt = f"""
{task.description}
IMPORTANT: You MUST end your response with one of these markers:
- ✅ TASK COMPLETE - if the task is fully done
- ❌ BLOCKED: <reason> - if you cannot proceed
- 🔄 IN PROGRESS: <next step> - if more work is needed
Do not end without a marker.
"""
return promptEffort: Low
Verdict: YES - Clear completion semantics.
What it is: Graduated control over what tools agents can use.
Current BLACKICE approach: All-or-nothing tool access.
Why adopt: Different tasks need different permissions. Don't give file deletion to a documentation agent.
Implementation sketch:
from enum import Enum
class PermissionMode(Enum):
AUTO_APPROVE = "auto_approve" # Trust agent completely
ALLOWLIST = "allowlist" # Only specific tools
DENYLIST = "denylist" # Block specific tools
INTERACTIVE = "interactive" # Ask human each time
DENY_ALL = "deny_all" # Read-only mode
@dataclass
class PermissionPolicy:
mode: PermissionMode
allowed_tools: list[str] = field(default_factory=list)
denied_tools: list[str] = field(default_factory=list)
# Per-role permission policies
ROLE_PERMISSIONS = {
"explorer": PermissionPolicy(
mode=PermissionMode.ALLOWLIST,
allowed_tools=["read_file", "grep", "list_directory", "web_search"]
),
"implementer": PermissionPolicy(
mode=PermissionMode.DENYLIST,
denied_tools=["rm", "delete", "drop_database", "format"]
),
"reviewer": PermissionPolicy(
mode=PermissionMode.ALLOWLIST,
allowed_tools=["read_file", "grep", "run_tests"]
),
"deployer": PermissionPolicy(
mode=PermissionMode.INTERACTIVE, # Human approves each action
),
}
class PermissionGuard:
def __init__(self, policy: PermissionPolicy):
self.policy = policy
async def check(self, tool: str, args: dict) -> bool:
match self.policy.mode:
case PermissionMode.AUTO_APPROVE:
return True
case PermissionMode.DENY_ALL:
return False
case PermissionMode.ALLOWLIST:
return tool in self.policy.allowed_tools
case PermissionMode.DENYLIST:
return tool not in self.policy.denied_tools
case PermissionMode.INTERACTIVE:
return await self.ask_human(tool, args)Effort: Medium
Verdict: YES - Essential for security.
What it is: Agents maintain notes across iterations in a scratchpad file.
Current BLACKICE approach: Context from Beads events.
Why adopt: Scratchpad is simpler for agent to read/write. Less overhead than event replay.
Implementation sketch:
class AgentScratchpad:
"""Persistent scratchpad for agent notes."""
def __init__(self, task_id: str):
self.path = Path(f".agent/{task_id}/scratchpad.md")
self.path.parent.mkdir(parents=True, exist_ok=True)
def read(self) -> str:
if self.path.exists():
return self.path.read_text()
return ""
def append(self, note: str):
"""Append note with timestamp."""
timestamp = datetime.now().isoformat()
entry = f"\n## {timestamp}\n{note}\n"
with self.path.open("a") as f:
f.write(entry)
def get_context_prompt(self) -> str:
"""Get scratchpad as context for agent."""
notes = self.read()
if not notes:
return ""
return f"""
## Previous Notes (from earlier iterations)
{notes}
---
Continue from where you left off.
"""
# Usage in Ralph Loop
scratchpad = AgentScratchpad(task.id)
for iteration in range(max_iterations):
context = scratchpad.get_context_prompt()
prompt = f"{context}\n\n{task.description}"
result = await agent.run(prompt)
# Agent's notes persist for next iteration
scratchpad.append(result.notes)Effort: Low
Verdict: YES - Simple and effective.
What it is: Automatically redact API keys and secrets from logs.
Current BLACKICE approach: Hope secrets aren't logged.
Why adopt: Defense in depth. Logs are often exposed.
Implementation sketch:
import re
class SecretMasker:
"""Mask secrets in log output."""
PATTERNS = [
(r"sk-[a-zA-Z0-9]{48}", "sk-***REDACTED***"), # OpenAI
(r"sk-ant-[a-zA-Z0-9-]{95}", "sk-ant-***REDACTED***"), # Anthropic
(r"AKIA[A-Z0-9]{16}", "AKIA***REDACTED***"), # AWS
(r"ghp_[a-zA-Z0-9]{36}", "ghp_***REDACTED***"), # GitHub
(r"password\s*[:=]\s*\S+", "password: ***REDACTED***"),
(r"token\s*[:=]\s*\S+", "token: ***REDACTED***"),
]
def mask(self, text: str) -> str:
for pattern, replacement in self.PATTERNS:
text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
return text
# Integrate with logging
class MaskedLogger:
def __init__(self, masker: SecretMasker):
self.masker = masker
def info(self, msg: str, **kwargs):
masked_msg = self.masker.mask(msg)
masked_kwargs = {k: self.masker.mask(str(v)) for k, v in kwargs.items()}
logger.info(masked_msg, **masked_kwargs)Effort: Low
Verdict: YES - Essential for production.
What it is: Separate output formatting from orchestration logic.
Current BLACKICE approach: CLI output only.
Why adopt: Same orchestration → different outputs (console, JSON, dashboard).
Implementation sketch:
from abc import ABC, abstractmethod
class OutputFormatter(ABC):
@abstractmethod
def task_started(self, task: Task): pass
@abstractmethod
def iteration_complete(self, iteration: int, result: IterationResult): pass
@abstractmethod
def task_complete(self, result: TaskResult): pass
class ConsoleFormatter(OutputFormatter):
def task_started(self, task: Task):
print(f"🚀 Starting: {task.name}")
def iteration_complete(self, iteration: int, result: IterationResult):
status = "✅" if result.success else "⏳"
print(f" {status} Iteration {iteration}: {result.summary}")
def task_complete(self, result: TaskResult):
print(f"🏁 Complete: {result.status}")
class JSONFormatter(OutputFormatter):
def task_started(self, task: Task):
print(json.dumps({"event": "started", "task": task.id}))
def iteration_complete(self, iteration: int, result: IterationResult):
print(json.dumps({"event": "iteration", "n": iteration, "success": result.success}))
def task_complete(self, result: TaskResult):
print(json.dumps({"event": "complete", "result": result.__dict__}))
class WebSocketFormatter(OutputFormatter):
def __init__(self, ws: WebSocket):
self.ws = ws
async def task_started(self, task: Task):
await self.ws.send_json({"event": "started", "task": task.id})Effort: Low
Verdict: YES - Clean separation of concerns.
Why skip: BLACKICE is Python. Don't fragment the codebase.
Why skip: BLACKICE's simpler adapter pattern is sufficient.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Completion Marker Detection | YES | Low | High |
| Security Masking | YES | Low | High |
| Scratchpad Persistence | YES | Low | Medium |
| Adaptive Permissions | YES | Medium | Medium |
| Output Formatter Abstraction | YES | Low | Low |
<!-- Source Gist 18 of 19: 4442ce070009cc6674820a517b64a8a3 -->
Oh-My-OpenCode Ideas for BLACKICE
Ideas from Oh-My-OpenCode ("Sisyphus") for BLACKICE.
A plugin harness for OpenCode enabling coordinated multi-agent workflows with specialized agents and curated tools.
| Aspect | Oh-My-OpenCode | BLACKICE |
|---|---|---|
| Focus | Agent orchestration with role-based models | Iterate-until-success with consensus |
| Platform | OpenCode plugin | Python CLI |
| Agents | Oracle, Frontend Engineer, Librarian, Explorer | Supervisor, Consensus, Workers |
| Model Routing | Role-based (GPT for strategy, Gemini for visual) | LLMRouter based on task type |
- Role-Based Model Assignment - Different models for different tasks
- LSP/AST-Driven Tools - Surgical refactoring, not naive text manipulation
- Background Agent Delegation - Reduce main agent context overhead
- Todo-Driven Enforcement - Force continuation if agents quit halfway
- MCP Integration - External tools without bloating prompts
What it is: Assign models by purpose, not just "pick the smartest."
Current BLACKICE approach: LLMRouter selects based on task complexity.
Why adopt: Different models excel at different things. Claude for architecture, GPT for strategic thinking, Ollama for fast iteration.
Implementation sketch:
@dataclass
class AgentRole:
name: str
purpose: str
preferred_model: str
fallback_models: list[str]
AGENT_ROLES = {
"architect": AgentRole(
name="Architect",
purpose="System design and high-level decisions",
preferred_model="claude-opus-4-5-20251101",
fallback_models=["gpt-4o", "claude-sonnet-4-20250514"]
),
"implementer": AgentRole(
name="Implementer",
purpose="Write and modify code",
preferred_model="claude-sonnet-4-20250514",
fallback_models=["ollama/qwen2.5-coder:32b"]
),
"reviewer": AgentRole(
name="Reviewer",
purpose="Code review and security audit",
preferred_model="gpt-4o",
fallback_models=["claude-sonnet-4-20250514"]
),
"explorer": AgentRole(
name="Explorer",
purpose="Codebase analysis and documentation",
preferred_model="ollama/qwen2.5-coder:7b", # Fast, cheap
fallback_models=["claude-sonnet-4-20250514"]
),
}
class RoleBasedRouter:
async def route(self, task: Task, role: str) -> str:
agent_role = AGENT_ROLES[role]
for model in [agent_role.preferred_model] + agent_role.fallback_models:
if await self.is_available(model):
return model
raise NoModelAvailable(f"No model available for role {role}")Effort: Low - extends existing LLMRouter
Verdict: YES - More nuanced than simple complexity-based routing.
What it is: Use Language Server Protocol and AST tools for surgical code changes.
Current BLACKICE approach: Text-based code generation.
Why adopt: Deterministic, safer transformations. Don't break code with regex.
Implementation sketch:
from ast_grep_py import SgRoot
class ASTRefactorer:
"""Surgical code refactoring using AST patterns."""
def rename_function(self, file_path: Path, old_name: str, new_name: str):
"""Rename function across file using AST."""
code = file_path.read_text()
root = SgRoot(code, "python")
# Find all call sites
pattern = f"$FUNC({old_name})"
matches = root.find_all(pattern)
# Replace with new name
for match in matches:
# Safe replacement preserving structure
pass
def extract_method(self, file_path: Path, start_line: int, end_line: int, new_name: str):
"""Extract lines into new method with proper imports."""
# Use LSP to find dependencies
# Generate method with correct signature
pass
class LSPClient:
"""Language Server Protocol client for code intelligence."""
async def find_references(self, file: Path, line: int, col: int) -> list[Location]:
"""Find all references to symbol."""
pass
async def get_definition(self, file: Path, line: int, col: int) -> Location:
"""Jump to definition."""
pass
async def rename_symbol(self, file: Path, line: int, col: int, new_name: str) -> list[Edit]:
"""Rename symbol across project."""
passEffort: Medium-High - requires LSP infrastructure
Verdict: YES - Essential for reliable refactoring.
What it is: Spawn cheap agents to process raw data, main agent works with summaries.
Current BLACKICE approach: Single agent processes everything.
Why adopt: Reduce context consumption. Main agent stays focused.
Implementation sketch:
class BackgroundDelegator:
"""Delegate heavy processing to background agents."""
async def digest_codebase(self, paths: list[Path]) -> str:
"""Have background agent summarize codebase."""
# Spawn cheap Ollama agent
background_agent = Agent(
model="ollama/qwen2.5-coder:7b",
purpose="Summarize code files"
)
summaries = []
for path in paths:
code = path.read_text()
summary = await background_agent.run(
f"Summarize this file in 2-3 sentences:\n{code}"
)
summaries.append(f"## {path}\n{summary}")
return "\n\n".join(summaries)
async def research_topic(self, topic: str) -> str:
"""Have background agent do web research."""
research_agent = Agent(
model="ollama/qwen2.5:7b",
tools=["web_search", "fetch_url"]
)
findings = await research_agent.run(
f"Research {topic} and provide a summary with key points."
)
return findings
# Main agent usage
async def solve_task(task: Task):
# Background agent digests codebase
codebase_summary = await delegator.digest_codebase(task.relevant_files)
# Main agent works with summary, not raw code
main_agent = Agent(model="claude-sonnet-4-20250514")
result = await main_agent.run(
f"Task: {task.description}\n\nCodebase context:\n{codebase_summary}"
)Effort: Medium - new delegation pattern
Verdict: YES - Token efficiency is critical for long tasks.
What it is: Force agents to continue if they quit halfway.
Current BLACKICE approach: Ralph Loop retries on failure, but not on premature quit.
Why adopt: Agents sometimes give up too early. Force completion.
Implementation sketch:
class ContinuationEnforcer:
"""Ensure agents complete their work."""
QUIT_PATTERNS = [
"I cannot complete this",
"This is beyond my capabilities",
"I'll stop here",
"Let me know if you need",
]
def detect_premature_quit(self, response: str) -> bool:
"""Check if agent quit prematurely."""
for pattern in self.QUIT_PATTERNS:
if pattern.lower() in response.lower():
return True
return False
async def enforce_continuation(self, agent: Agent, task: Task, response: str) -> str:
"""If agent quit, push them to continue."""
if not self.detect_premature_quit(response):
return response
continuation_prompt = """
You stopped before completing the task. This is not acceptable.
Original task: {task.description}
Your incomplete response ended with: "{response[-200:]}"
Continue from where you left off. Complete the task fully.
Do not apologize. Do not explain limitations. Just do the work.
"""
return await agent.run(continuation_prompt.format(
task=task, response=response
))Effort: Low
Verdict: YES - Prevents wasted iterations.
What it is: Use screenshots instead of raw code for UI work.
Current BLACKICE approach: Text-only context.
Why adopt: Screenshots can convey layout faster than code. Saves tokens.
Implementation sketch:
import subprocess
from pathlib import Path
class MultimodalContext:
"""Use images to reduce text context."""
async def capture_ui_state(self, url: str) -> Path:
"""Capture screenshot of web UI."""
screenshot_path = Path(f"/tmp/ui-{uuid4()}.png")
subprocess.run([
"playwright", "screenshot", url, str(screenshot_path)
])
return screenshot_path
async def capture_terminal(self) -> Path:
"""Capture terminal output as image."""
screenshot_path = Path(f"/tmp/term-{uuid4()}.png")
subprocess.run(["screencapture", "-l", str(screenshot_path)])
return screenshot_path
async def analyze_with_vision(self, image: Path, question: str) -> str:
"""Use vision model to analyze image."""
agent = Agent(model="gpt-4o") # Vision-capable
return await agent.run_with_image(image, question)Effort: Low-Medium
Verdict: MAYBE - Useful for UI work, not general coding.
Why skip: BLACKICE should remain framework-agnostic.
Why skip: BLACKICE's skill system is simpler and sufficient.
| Feature | Worth Adopting? | Effort | Priority |
|---|---|---|---|
| Continuation Enforcement | YES | Low | High |
| Role-Based Model Assignment | YES | Low | High |
| Background Agent Delegation | YES | Medium | Medium |
| LSP/AST Refactoring | YES | High | Medium |
| Multimodal Context | MAYBE | Low | Low |
<!-- Source Gist 19 of 19: 9569ccc3aa932d75f19d702b9d945f4c -->
BLACKICE - Complete System Documentation (Context Drop)
A comprehensive context drop for the BLACKICE autonomous software development system.
Last Updated: January 2026
Total Lines: 77,113 Python (source + tests)
Repository: github.com/jmanhype/blackice
- What Is BLACKICE?
- Core Philosophy
- Architecture Overview
- Key Components
- File Structure
- Data Models
- Execution Flow
- Configuration
- Deployment
- API Reference
- Comparison to Alternatives
- Future Work
BLACKICE is a Ralph Loop implementation with multi-agent consensus, crash recovery, and enterprise observability.
One sentence: You describe a task → BLACKICE coordinates multiple LLM agents → they iterate with self-reflection until success → working code is delivered.
| Term | Meaning |
|---|---|
| BLACKICE | Project/repo name |
| Ralph Loop | Core pattern: iterate until success with learning |
| EnterpriseFlywheel | Main orchestrator class (186KB, 4500+ lines) |
| Service Colony | Academic foundation (arXiv:2407.07267) |
Input: "Write a REST API for user authentication"
↓
BLACKICE:
1. Routes task to best LLM (Claude/Ollama/Letta)
2. Spins up agents in isolated git worktrees
3. Agents propose solutions
4. Consensus voting selects best approach
5. SafetyGuard prevents infinite loops
6. CostTracker enforces token/time budgets
7. Beads logs everything for crash recovery
8. Reflexion learns from failures
9. Iterate until validation passes
↓
Output: Working code, committed to repo
From ghuntley.com/ralph:
"Keep trying different approaches until you succeed, learning from each failure."
┌─────────────────────────────────────────────┐
│ RALPH LOOP │
│ │
│ ┌──────┐ ┌──────┐ ┌──────────┐ │
│ │ TRY │───▶│ FAIL │───▶│ REFLECT │ │
│ └──────┘ └──────┘ └────┬─────┘ │
│ ▲ │ │
│ │ ▼ │
│ │ ┌──────────┐ │
│ └─────────────────│ LEARN │ │
│ └──────────┘ │
│ │
│ Until: SUCCESS or MAX_ITERATIONS │
└─────────────────────────────────────────────┘
Unlike single-agent systems, BLACKICE uses multiple agents voting on solutions:
| Strategy | Description | Use Case |
|---|---|---|
majority |
>50% approval | Default for most tasks |
supermajority |
>66% approval | Critical changes |
unanimous |
100% approval | Security-sensitive |
quorum |
Minimum voters required | Large agent pools |
first_n |
First N approvals win | Fast iteration |
weighted |
Weighted by agent expertise | Specialist tasks |
All state is persisted to Beads (append-only SQLite event store):
Agent crashes mid-task
↓
RecoveryManager reads Beads
↓
Reconstructs state from events
↓
New agent continues from last checkpoint
↓
Task completes successfully
┌─────────────────────────────────────────────────────────────────┐
│ Layer 11: CLI │
│ ralph_cli.py - Command-line interface │
├─────────────────────────────────────────────────────────────────┤
│ Layer 10: Orchestrator │
│ orchestrator.py - High-level task coordination │
├─────────────────────────────────────────────────────────────────┤
│ Layer 9: EnterpriseFlywheel │
│ enterprise_flywheel.py - Unified orchestrator (186KB) │
│ Integrates ALL capabilities into single entry point │
├─────────────────────────────────────────────────────────────────┤
│ Layer 8: Reflexion │
│ reflexion.py - Self-improvement loop │
│ Multi-dimensional quality scoring │
├─────────────────────────────────────────────────────────────────┤
│ Layer 7: Recovery │
│ recovery_manager.py - Crash recovery from Beads │
│ dead_letter_queue.py - Failed task handling │
│ worktree_pool.py - Git worktree isolation │
├─────────────────────────────────────────────────────────────────┤
│ Layer 6: Persistence │
│ beads.py - Append-only SQLite event store (40+ types) │
│ semantic_memory.py - Letta embeddings │
│ artifact_store.py - S3/MinIO storage │
├─────────────────────────────────────────────────────────────────┤
│ Layer 5: Instrumentation │
│ tracer.py - OpenTelemetry distributed tracing │
│ metrics.py - Prometheus counters/histograms │
│ logger.py - Structured JSON logging (structlog) │
│ safety_guard.py - Policy enforcement, loop detection │
│ cost_tracker.py - Token/time budget management │
├─────────────────────────────────────────────────────────────────┤
│ Layer 4: Service Colony │
│ agents/supervisor.py - Task decomposition │
│ agents/consensus.py - 6 voting strategies │
│ agents/mail.py - Inter-agent messaging │
│ agents/registry.py - Capability-based routing │
├─────────────────────────────────────────────────────────────────┤
│ Layer 3: Core Loop │
│ loop.py - Ralph iterate-until-success pattern │
│ retry.py - Exponential backoff with jitter │
│ cancellation.py - Timeout and cancellation │
├─────────────────────────────────────────────────────────────────┤
│ Layer 2: Adapters │
│ adapters/claude_code.py - Claude Code CLI │
│ adapters/claude_proxy.py - Claude via AI Factory │
│ adapters/ollama.py - Local Ollama inference │
│ adapters/letta.py - Persistent memory agents │
│ adapters/codex.py - OpenAI Codex │
├─────────────────────────────────────────────────────────────────┤
│ Layer 1: Dispatcher │
│ dispatcher.py - Route to ai-factory/speckit/LLM │
│ router.py - Smart model selection │
├─────────────────────────────────────────────────────────────────┤
│ Layer 0: Infrastructure │
│ ai-factory/ - Docker Compose (Ollama, Letta, Postgres) │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────┐
│ EnterpriseFlywheel │
│ (Unified Orchestrator) │
└─────────────────┬───────────────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ SafetyGuard │ │ CostTracker │ │ SmartRouter │
│ - Policies │ │ - Token budget │ │ - Model select │
│ - Loop detect│ │ - Time budget │ │ - Capability │
└───────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└─────────────────────────────┼─────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ DAGExecutor │
│ (Parallel Workflow Engine) │
└─────────────────┬───────────────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ WorktreePool │ │ Consensus │ │ AgentMail │
│ - Git isolate │ │ - 6 strategies │ │ - Request/reply │
│ - Per task │ │ - Vote collect │ │ - Broadcast │
└───────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└─────────────────────────────┼─────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Adapters Layer │
│ Claude │ Ollama │ Letta │ Codex │
└─────────────────┬───────────────────┘
│
┌─────────────────────────────┼─────────────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Beads │ │ RecoveryManager │ │ DeadLetterQ │
│ - Event store │ │ - Crash recover │ │ - Failed tasks │
│ - 40+ types │ │ - State replay │ │ - Retry policy │
└───────────────┘ └─────────────────┘ └─────────────────┘
The unified orchestrator that brings everything together. 186KB, 4500+ lines.
from integrations.ralph.enterprise_flywheel import (
EnterpriseFlywheel,
EnterpriseFlywheelConfig,
EnterpriseTask,
)
config = EnterpriseFlywheelConfig(
ollama_url="http://192.168.1.143:11434",
letta_url="http://192.168.1.143:8283",
claude_proxy_url="http://192.168.1.143:42069",
max_iterations=10,
max_tokens_per_task=100_000,
)
flywheel = EnterpriseFlywheel(config)
task = EnterpriseTask(
id="example",
name="Code Generation",
description="Write a function to calculate fibonacci",
)
result = await flywheel.execute_task(task)Key capabilities:
- LLMRouter for intelligent model selection
- DAGExecutor for parallel workflow execution
- WorktreePool for git worktree isolation per task
- RecoveryManager for crash recovery from Beads events
- DeadLetterQueue for failed task handling with retry
- SafetyGuard for policy enforcement and loop detection
- CostTracker for token/time budget management
- LettaAdapter for persistent memory across sessions
- ReflexionLoop for multi-dimensional quality scoring
Append-only SQLite event store with 40+ event types.
from integrations.ralph.beads import BeadsStore, EventType, EntityType
beads = BeadsStore(Path("~/.beads/beads.db"))
# Emit events
beads.emit(
event_type=EventType.TASK_STARTED,
entity_type=EntityType.TASK,
entity_id="task-123",
data={"description": "Write fibonacci"},
)
# Query events
events = beads.query_by_entity("task-123")
# Replay for recovery
for event in beads.replay_from(checkpoint_id):
apply_event(event)Event Types (40+):
- Run:
RUN_STARTED,RUN_COMPLETED,RUN_FAILED,RUN_STATE_TRANSITION - Task:
TASK_QUEUED,TASK_STARTED,TASK_SUCCEEDED,TASK_FAILED,TASK_RETRY - Mail:
MAIL_SENT,MAIL_DELIVERED,MAIL_ACKED,MAIL_EXPIRED - Workspace:
GIT_CHECKPOINT_CREATED,WORKTREE_ACQUIRED,WORKTREE_RELEASED - Consensus:
PROPOSAL_CREATED,VOTE_CAST,CONSENSUS_REACHED
6 voting strategies for multi-agent decision making.
from integrations.ralph.agents.consensus import (
ConsensusEngine,
ConsensusStrategy,
Proposal,
VoteValue,
)
engine = ConsensusEngine(strategy=ConsensusStrategy.MAJORITY)
# Create proposal
proposal = engine.create_proposal(
proposer="agent-1",
content={"solution": "Use recursion"},
timeout_seconds=60,
)
# Agents vote
engine.cast_vote(proposal.id, "agent-2", VoteValue.APPROVE, "Clean solution")
engine.cast_vote(proposal.id, "agent-3", VoteValue.APPROVE, "Efficient")
engine.cast_vote(proposal.id, "agent-4", VoteValue.REJECT, "Prefer iteration")
# Check result
result = engine.get_result(proposal.id)
# result.status = ProposalStatus.APPROVED (3 approve > 1 reject)Strategies:
| Strategy | Rule |
|---|---|
MAJORITY |
>50% approve |
SUPERMAJORITY |
>66% approve |
UNANIMOUS |
100% approve |
QUORUM |
Minimum N voters, then majority |
FIRST_N |
First N approvals win |
WEIGHTED |
Sum of weights, threshold |
Multi-LLM support - not locked to any single provider.
# Claude via AI Factory proxy
from integrations.ralph.adapters.claude_proxy import ClaudeProxyAdapter
claude = ClaudeProxyAdapter(url="http://192.168.1.143:42069")
# Local Ollama
from integrations.ralph.adapters.ollama import OllamaAdapter
ollama = OllamaAdapter(url="http://192.168.1.143:11434")
# Letta (persistent memory)
from integrations.ralph.adapters.letta import LettaAdapter
letta = LettaAdapter(url="http://192.168.1.143:8283")
# OpenAI Codex
from integrations.ralph.adapters.codex import CodexAdapter
codex = CodexAdapter(api_key="...")Adapter interface:
class BaseAdapter:
async def execute(self, prompt: str, **kwargs) -> AdapterResult:
"""Execute a prompt and return the result."""
async def health_check(self) -> bool:
"""Check if the adapter is healthy."""
def get_capabilities(self) -> list[str]:
"""Return list of capabilities (coding, reasoning, etc)."""Enterprise observability with OpenTelemetry, Prometheus, and structlog.
# Tracing
from integrations.ralph.instrumentation.tracer import RalphTracer
tracer = RalphTracer(service_name="blackice")
with tracer.span("execute_task", attributes={"task_id": "123"}):
result = await run_task()
# Metrics
from integrations.ralph.instrumentation.metrics import RalphMetrics
metrics = RalphMetrics(port=9090)
metrics.task_started("task-123")
metrics.tokens_used(1500, model="claude-3-5-sonnet")
metrics.task_completed("task-123", duration_ms=5000)
# Structured Logging
from integrations.ralph.instrumentation.logger import get_logger
log = get_logger("ralph.loop")
log.info("task_started", task_id="123", model="claude")
# Output: {"event": "task_started", "task_id": "123", "model": "claude", "timestamp": "..."}Policy enforcement and budget management.
# SafetyGuard
from integrations.ralph.instrumentation.safety_guard import SafetyGuard
guard = SafetyGuard(
max_iterations=10,
loop_detection_threshold=3,
allowed_policies=["default"],
)
decision = guard.evaluate(checkpoint=Checkpoint.BEFORE_ITERATION, context={...})
if decision.action == SafetyAction.ABORT:
raise SafetyViolation(decision.reason)
# CostTracker
from integrations.ralph.instrumentation.cost_tracker import CostTracker
tracker = CostTracker(
max_tokens=100_000,
max_time_seconds=600,
)
tracker.record_tokens("task-123", 1500)
if not tracker.can_continue("task-123"):
raise BudgetExceeded()blackice/
├── integrations/ralph/ # THE SYSTEM (77K lines)
│ ├── __init__.py # Public API exports
│ ├── enterprise_flywheel.py # Unified orchestrator (186KB)
│ ├── loop.py # Ralph loop core (68KB)
│ ├── beads.py # Event store (28KB)
│ ├── models.py # Data models (17KB)
│ │
│ ├── agents/ # Service Colony patterns
│ │ ├── supervisor.py # Task decomposition (61KB)
│ │ ├── consensus.py # 6 voting strategies (24KB)
│ │ ├── mail.py # Inter-agent messaging (25KB)
│ │ └── registry.py # Capability routing (29KB)
│ │
│ ├── adapters/ # LLM backends
│ │ ├── base.py # Adapter interface
│ │ ├── claude_code.py # Claude Code CLI
│ │ ├── claude_proxy.py # Claude via AI Factory
│ │ ├── ollama.py # Local Ollama
│ │ ├── letta.py # Persistent memory
│ │ └── codex.py # OpenAI Codex
│ │
│ ├── instrumentation/ # Observability (Layer 5)
│ │ ├── tracer.py # OpenTelemetry (10KB)
│ │ ├── metrics.py # Prometheus (15KB)
│ │ ├── logger.py # Structlog (10KB)
│ │ ├── safety_guard.py # Policy enforcement (6KB)
│ │ ├── cost_tracker.py # Budget management (3KB)
│ │ └── fingerprint.py # Loop detection (3KB)
│ │
│ ├── recovery_manager.py # Crash recovery (13KB)
│ ├── dead_letter_queue.py # Failed task handling (13KB)
│ ├── worktree_pool.py # Git isolation (12KB)
│ ├── dag_executor.py # Parallel workflows (31KB)
│ ├── worker_pool.py # Agent workers (26KB)
│ ├── reflexion.py # Self-improvement (23KB)
│ ├── semantic_memory.py # Letta embeddings (22KB)
│ ├── retry.py # Exponential backoff (19KB)
│ ├── agent_mail.py # Messaging (27KB)
│ ├── artifact_store.py # S3 storage (17KB)
│ ├── git_checkpoint.py # Git checkpoints (20KB)
│ ├── cancellation.py # Timeouts (21KB)
│ │
│ ├── cli/ # CLI interface
│ │ └── ...
│ │
│ ├── tests/ # Test suite (33 files)
│ │ ├── test_enterprise_flywheel.py
│ │ ├── test_consensus.py
│ │ ├── test_beads.py
│ │ └── ... (30 more)
│ │
│ └── config/ # Configuration
│ └── ...
│
├── ai-factory/ # Docker infrastructure (submodule)
│ ├── docker-compose.yml # Container definitions
│ ├── litellm-config.yaml # LLM routing
│ └── llmrouter/ # Model selection service
│
├── specs/service-colony/ # SpecKit documentation
│ ├── spec.md # WHAT/WHY
│ ├── plan.md # HOW
│ ├── tasks.md # 51 implementation tasks
│ ├── checklist.md # Quality gates (73%)
│ ├── deployment.md # AI Factory setup
│ ├── troubleshooting.md # Common issues
│ └── whitepaper.md # Technical white paper
│
└── .claude/skills/ # Claude Code skills
class RunState(str, Enum):
"""State machine for task execution."""
INIT = "init"
PLANNING = "planning"
RUNNING = "running"
CHECKPOINTING = "checkpointing"
SUCCEEDED = "succeeded"
FAILED = "failed"
ABORTED = "aborted"
PAUSED = "paused"
RESUMING = "resuming"
ROLLING_BACK = "rolling_back"
class TaskType(str, Enum):
"""Task classification for routing."""
CODING = "coding"
PLANNING = "planning"
REASONING = "reasoning"
GENERATION = "generation"
VALIDATION = "validation"
class AttemptOutcome(str, Enum):
"""Result of an execution attempt."""
SUCCESS = "success"
FAILURE = "failure"
ERROR = "error"
TIMEOUT = "timeout"
@dataclass
class LoopConfig:
"""Configuration for Ralph Loop."""
max_iterations: int = 100
memory_agent_id: str = "agent-xxx"
default_model: str = "qwen2.5-coder:7b"
validation_timeout: int = 30
refinement_mode: Literal["auto", "manual", "disabled"] = "auto"
@dataclass
class EnterpriseTask:
"""A task to be executed by the flywheel."""
id: str
name: str
description: str
task_type: TaskType = TaskType.CODING
priority: int = 5
dependencies: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)class EventType(str, Enum):
"""40+ event types for Beads."""
# Run lifecycle
RUN_STARTED = "run_started"
RUN_STATE_TRANSITION = "run_state_transition"
RUN_COMPLETED = "run_completed"
RUN_FAILED = "run_failed"
# Task lifecycle
TASK_QUEUED = "task_queued"
TASK_STARTED = "task_started"
TASK_SUCCEEDED = "task_succeeded"
TASK_FAILED = "task_failed"
TASK_RETRY = "task_retry"
# Consensus
PROPOSAL_CREATED = "proposal_created"
VOTE_CAST = "vote_cast"
CONSENSUS_REACHED = "consensus_reached"
# Workspace
GIT_CHECKPOINT_CREATED = "git_checkpoint_created"
WORKTREE_ACQUIRED = "worktree_acquired"
WORKTREE_RELEASED = "worktree_released"
# ... 30+ more1. CLI receives task
└── ralph run "Write fibonacci function"
2. EnterpriseFlywheel.execute_task()
├── SafetyGuard.evaluate(START_OF_RUN)
├── CostTracker.start_tracking()
└── WorktreePool.acquire()
3. SmartRouter.select_model()
├── Analyze task type (coding)
├── Check adapter health
└── Return: "claude-3-5-sonnet"
4. DAGExecutor.execute()
├── Create execution graph
└── Run nodes in parallel where possible
5. For each iteration:
├── SafetyGuard.evaluate(BEFORE_ITERATION)
├── Adapter.execute(prompt)
├── Beads.emit(TASK_PROGRESS)
├── Validator.validate(result)
└── If failed: Reflexion.reflect() → refine prompt
6. Consensus (if multi-agent):
├── ConsensusEngine.create_proposal()
├── Agents cast votes
└── ConsensusEngine.get_result()
7. On success:
├── Beads.emit(TASK_SUCCEEDED)
├── WorktreePool.release()
├── SemanticMemory.store_success()
└── Return FlywheelResult
8. On failure:
├── Beads.emit(TASK_FAILED)
├── DeadLetterQueue.enqueue()
└── Return error
1. Agent crashes mid-task
2. New agent starts
└── RecoveryManager.create_recovery_plan()
3. RecoveryManager:
├── Query Beads for last run
├── Find last checkpoint
└── Build recovery plan
4. EnterpriseFlywheel.recover()
├── For completed subtasks: skip
├── For pending subtasks: execute
└── For failed subtasks: retry or DLQ
5. Continue from checkpoint
└── Task completes
# Adapters
RALPH_CLAUDE_PROXY_URL=http://192.168.1.143:42069
RALPH_OLLAMA_URL=http://192.168.1.143:11434
RALPH_LETTA_URL=http://192.168.1.143:8283
# Limits
RALPH_MAX_ITERATIONS=100
RALPH_MAX_TOKENS=100000
RALPH_MAX_TIME_SECONDS=600
# Safety
RALPH_LOOP_DETECTION_THRESHOLD=3
RALPH_ALLOWED_POLICIES=default
# Memory
RALPH_MEMORY_AGENT_ID=agent-xxx
RALPH_BEADS_PATH=~/.beads/beads.db
# Observability
RALPH_TRACING_ENABLED=true
RALPH_METRICS_PORT=9090
RALPH_LOG_LEVEL=INFO
RALPH_LOG_JSON=trueadapters:
claude_proxy:
url: "http://192.168.1.143:42069"
default_model: "claude-3-5-haiku-20241022"
timeout: 120
ollama:
url: "http://192.168.1.143:11434"
default_model: "qwen2.5-coder:7b"
letta:
url: "http://192.168.1.143:8283"
agent_id: "agent-xxx"
safety:
max_iterations: 10
loop_detection_threshold: 3
allowed_policies:
- default
cost:
max_tokens_per_task: 100000
max_time_per_task_seconds: 600
observability:
tracing_enabled: true
tracing_console_export: true
metrics_enabled: true
metrics_port: 9090
logging_enabled: true
logging_json: true
worktree:
base_path: /tmp/ralph-worktrees
max_pool_size: 10cd ai-factory
cp .env.template .env
# Edit .env with API keys
docker compose up -d
# Services:
# - postgres-vector:5432 (vector database)
# - ollama:11434 (local LLM inference)
# - letta-server:8283 (persistent memory agents)
# - litellm:4000 (multi-provider proxy)
# - llmrouter:4001 (intelligent model selection)
# Pull models
ollama pull qwen2.5-coder:7b# Install dependencies
pip install -e integrations/ralph
# Configure
mkdir -p ~/.ralph
cp config.example.yaml ~/.ralph/config.yaml
# Edit with your endpoints
# Run a task
python -m integrations.ralph.cli run "Write hello world in Python"
# Check status
python -m integrations.ralph.cli status
# View dashboard
python -m integrations.ralph.cli dashboard# Check all services
curl -s http://192.168.1.143:11434/api/tags && echo "Ollama OK"
curl -s http://192.168.1.143:8283/v1/health && echo "Letta OK"
curl -s http://192.168.1.143:42069/v1/models && echo "Claude Proxy OK"# Run a task
ralph run "Write a REST API for users"
ralph run --model claude "Complex reasoning task"
ralph run --parallel 3 "Generate test cases"
# Status and monitoring
ralph status # Current task status
ralph dashboard # TUI monitoring
ralph logs --tail 100 # Recent logs
# Dead Letter Queue
ralph dlq list # View failed tasks
ralph dlq retry <task_id> # Retry a task
ralph dlq purge --expired # Clean up
# Recovery
ralph recover # Recover from crash
ralph recover --from-checkpoint <id>from integrations.ralph import (
EnterpriseFlywheel,
EnterpriseFlywheelConfig,
EnterpriseTask,
TaskType,
)
# Initialize
config = EnterpriseFlywheelConfig(...)
flywheel = EnterpriseFlywheel(config)
# Execute task
task = EnterpriseTask(
id="task-1",
name="Generate Code",
description="Write a fibonacci function",
task_type=TaskType.CODING,
)
result = await flywheel.execute_task(task)
# Check result
if result.success:
print(f"Output: {result.output}")
print(f"Files: {result.files_changed}")
else:
print(f"Failed: {result.error}")
# Recovery
plan = await flywheel.recover()
print(f"Recovered {len(plan.completed)} tasks")| Aspect | BLACKICE | Gas Town |
|---|---|---|
| Core Pattern | Ralph Loop | MEOW (molecular workflows) |
| Language | Python (77K) | Go (75K) |
| Decision Making | 6 consensus strategies | Mayor decides |
| LLM Support | Claude, Ollama, Letta, Codex | Claude Code only |
| Observability | OpenTelemetry, Prometheus | Activity feeds |
| UI | CLI | tmux (visual) |
| Recovery | Beads event replay | GUPP + hooks |
| Aspect | BLACKICE | Dapr Agents |
|---|---|---|
| Target | Single GPU server | Kubernetes scale |
| Consensus | 6 strategies | 3 modes |
| State | SQLite (Beads) | Pluggable stores |
| Complexity | Simpler deployment | Sidecar + control plane |
| Maturity | Custom code | CNCF graduated |
| Aspect | BLACKICE | CrewAI / AutoGen |
|---|---|---|
| Focus | Code generation | General agents |
| Consensus | Built-in voting | None |
| Recovery | Beads event store | None |
| Observability | Full stack | Basic logging |
| Git Integration | Worktree isolation | None |
| Feature | Priority | Effort |
|---|---|---|
| Convoys (work bundling) | High | Low |
| GUPP (hook propulsion) | High | Medium |
| Patrol Agents (self-healing) | High | Medium |
| MEOW (workflow DSL) | Medium | High |
| tmux UI | Low | Medium |
- Convoys - Bundle related tasks for tracking
- GUPP - Simplify crash recovery with hooks
- Patrol Agents - Self-healing background workers
- MEOW - Workflow DSL (Formulas → Molecules)
- Federation - Remote workers on cloud
- Web UI - Visual dashboard
Generated: January 2026