Skip to content

Instantly share code, notes, and snippets.

@jmanhype
Created March 25, 2026 16:36
Show Gist options
  • Select an option

  • Save jmanhype/e41ef778b88cff9af702bf3fbe732510 to your computer and use it in GitHub Desktop.

Select an option

Save jmanhype/e41ef778b88cff9af702bf3fbe732510 to your computer and use it in GitHub Desktop.
BLACKICE Research Ideas - Consolidated from 19 tool analyses (Jan 7-8, 2026)

BLACKICE Research Ideas - Consolidated

Consolidated from 19 tool/project analyses (Jan 7-8, 2026). Each section contains ideas worth adopting for BLACKICE from a different open-source tool.



<!-- Source Gist 1 of 19: 2b8159ee806769c1358481bc20b2c70b -->

Agentic Coding Flywheel Setup (ACFS) Ideas for BLACKICE

Agentic Coding Flywheel Setup (ACFS) Ideas for BLACKICE

Ideas from ACFS that could improve BLACKICE.

What is ACFS?

ACFS transforms a fresh Ubuntu VPS into a fully-configured AI development environment in ~30 minutes via a single command.

Aspect ACFS BLACKICE
Focus Bootstrap AI dev environment Run autonomous coding tasks
Platform Ubuntu VPS Any (Python)
Pattern Manifest → Generate → Install Ralph Loop + Consensus
State ~/.acfs/state.json Beads event store
Verification acfs doctor No unified health check

Key Features

  1. Manifest-Driven Generation - YAML defines all tools, TypeScript generates installers
  2. Idempotent Installation - Safe re-runs, resume from interruption
  3. Security Verification - SHA256 checksums for all upstream scripts
  4. Doctor Health Checks - Single command verifies entire stack
  5. Modular Categories - 11 installer categories, independently testable

Ideas Worth Adopting

1. Manifest-Driven Agent Registry

What it is: Single YAML file defines all agents, their capabilities, models, and verification commands.

Current BLACKICE approach: Hardcoded adapters in Python code.

Why adopt: Change agent config without code changes. Generate docs, CLI help, and validation from one source.

Implementation sketch:

# blackice-manifest.yaml
version: "1.0"

agents:
  claude-coder:
    description: "Primary coding agent using Claude"
    adapter: claude_proxy
    model: claude-sonnet-4-20250514
    capabilities:
      - code_generation
      - code_review
      - refactoring
    verification:
      command: "curl -s http://localhost:42069/health"
      expected: "ok"
    config:
      max_tokens: 8192
      temperature: 0.7

  ollama-fast:
    description: "Fast local inference for iteration"
    adapter: ollama
    model: qwen2.5-coder:7b
    capabilities:
      - code_generation
      - quick_iteration
    verification:
      command: "curl -s http://localhost:11434/api/tags | jq '.models | length'"
      expected_min: 1
    config:
      max_tokens: 4096
      temperature: 0.3

  letta-memory:
    description: "Long-term memory agent"
    adapter: letta
    capabilities:
      - semantic_memory
      - cross_session_learning
    verification:
      command: "curl -s http://localhost:8283/v1/health"
      expected: "ok"

consensus:
  strategies:
    - majority
    - supermajority
    - unanimous
  default: majority
  quorum_size: 3

infrastructure:
  beads_db: "~/.beads/beads.db"
  worktree_base: "/tmp/ralph-worktrees"
  log_level: INFO
# Generate from manifest
from pathlib import Path
import yaml

def load_manifest(path: Path = Path("blackice-manifest.yaml")) -> dict:
    return yaml.safe_load(path.read_text())

def generate_agent_registry(manifest: dict) -> str:
    """Generate Python code for agent registry."""
    code = ["# AUTO-GENERATED from blackice-manifest.yaml", ""]
    code.append("AGENTS = {")
    for name, config in manifest["agents"].items():
        code.append(f"    '{name}': {{")
        code.append(f"        'adapter': '{config['adapter']}',")
        code.append(f"        'model': '{config['model']}',")
        code.append(f"        'capabilities': {config['capabilities']},")
        code.append(f"    }},")
    code.append("}")
    return "\n".join(code)

Effort: Medium - restructure config loading

Verdict: YES - Single source of truth is powerful.


2. blackice doctor Health Command

What it is: Single command that verifies entire stack is operational.

Current BLACKICE approach: Must check each service manually.

Why adopt: One command to answer "is everything working?"

Implementation sketch:

import asyncio
import subprocess
from dataclasses import dataclass
from typing import Literal

@dataclass
class HealthCheck:
    name: str
    status: Literal["pass", "fail", "warn"]
    message: str
    latency_ms: float | None = None

class DoctorCommand:
    """Unified health check for BLACKICE stack."""

    def __init__(self, manifest: dict):
        self.manifest = manifest

    async def check_all(self) -> list[HealthCheck]:
        checks = []

        # Check all agents from manifest
        for name, config in self.manifest["agents"].items():
            check = await self._check_agent(name, config)
            checks.append(check)

        # Check infrastructure
        checks.append(await self._check_beads())
        checks.append(await self._check_worktrees())

        return checks

    async def _check_agent(self, name: str, config: dict) -> HealthCheck:
        verification = config.get("verification", {})
        command = verification.get("command")
        expected = verification.get("expected")

        if not command:
            return HealthCheck(name, "warn", "No verification command defined")

        try:
            start = asyncio.get_event_loop().time()
            result = subprocess.run(
                command, shell=True, capture_output=True, timeout=5
            )
            latency = (asyncio.get_event_loop().time() - start) * 1000

            output = result.stdout.decode().strip()
            if expected and output == expected:
                return HealthCheck(name, "pass", f"Healthy ({latency:.0f}ms)", latency)
            elif result.returncode == 0:
                return HealthCheck(name, "pass", f"Running ({latency:.0f}ms)", latency)
            else:
                return HealthCheck(name, "fail", result.stderr.decode()[:100])
        except Exception as e:
            return HealthCheck(name, "fail", str(e)[:100])

    async def _check_beads(self) -> HealthCheck:
        db_path = Path(self.manifest["infrastructure"]["beads_db"]).expanduser()
        if db_path.exists():
            size_mb = db_path.stat().st_size / 1024 / 1024
            return HealthCheck("beads", "pass", f"OK ({size_mb:.1f} MB)")
        return HealthCheck("beads", "fail", f"Database not found: {db_path}")

# CLI usage
# $ blackice doctor
# ┌─────────────────────────────────────────────────────┐
# │ BLACKICE Health Check                               │
# ├──────────────┬────────┬─────────────────────────────┤
# │ Component    │ Status │ Details                     │
# ├──────────────┼────────┼─────────────────────────────┤
# │ claude-coder │ ✓ PASS │ Healthy (45ms)              │
# │ ollama-fast  │ ✓ PASS │ Running (12ms)              │
# │ letta-memory │ ✓ PASS │ Healthy (23ms)              │
# │ beads        │ ✓ PASS │ OK (156.2 MB)               │
# │ worktrees    │ ✓ PASS │ 3 active, 12 available      │
# └──────────────┴────────┴─────────────────────────────┘

Effort: Low - straightforward implementation

Verdict: YES - Essential for operations.


3. Idempotent Setup/Deploy

What it is: Setup commands that are safe to re-run. Interrupted runs resume.

Current BLACKICE approach: Manual setup, no state tracking.

Why adopt: Reliable deployment. Don't break things on re-run.

Implementation sketch:

@dataclass
class SetupState:
    completed_steps: list[str]
    last_step: str | None
    started_at: datetime
    completed_at: datetime | None

class IdempotentSetup:
    """Setup that tracks progress and resumes safely."""

    STATE_FILE = Path("~/.blackice/setup-state.json").expanduser()

    def __init__(self):
        self.state = self._load_state()

    def _load_state(self) -> SetupState:
        if self.STATE_FILE.exists():
            data = json.loads(self.STATE_FILE.read_text())
            return SetupState(**data)
        return SetupState([], None, datetime.now(), None)

    def _save_state(self):
        self.STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
        self.STATE_FILE.write_text(json.dumps(asdict(self.state)))

    async def run_step(self, step_id: str, action: Callable):
        """Run step only if not already completed."""
        if step_id in self.state.completed_steps:
            print(f"⏭️  Skipping {step_id} (already done)")
            return

        print(f"▶️  Running {step_id}...")
        self.state.last_step = step_id
        self._save_state()

        try:
            await action()
            self.state.completed_steps.append(step_id)
            self._save_state()
            print(f"✓  Completed {step_id}")
        except Exception as e:
            print(f"✗  Failed {step_id}: {e}")
            raise

# Usage
setup = IdempotentSetup()
await setup.run_step("install_ollama", install_ollama)
await setup.run_step("pull_models", pull_models)
await setup.run_step("init_beads", init_beads)
await setup.run_step("create_worktrees", create_worktrees)

Effort: Low - simple state file

Verdict: YES - Professional deployment experience.


4. SHA256 Verification for External Resources

What it is: Verify checksums of any downloaded scripts/models before execution.

Current BLACKICE approach: Trust upstream sources.

Why adopt: Defense in depth. Catch supply chain attacks.

Implementation sketch:

# checksums.yaml
resources:
  ollama-install:
    url: "https://ollama.com/install.sh"
    sha256: "abc123..."

  litellm-config:
    url: "https://raw.githubusercontent.com/.../litellm.yaml"
    sha256: "def456..."
import hashlib
import httpx

class VerifiedDownloader:
    def __init__(self, checksums_file: Path):
        self.checksums = yaml.safe_load(checksums_file.read_text())

    async def download(self, resource_id: str) -> bytes:
        resource = self.checksums["resources"][resource_id]
        url = resource["url"]
        expected_sha = resource["sha256"]

        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            content = response.content

        actual_sha = hashlib.sha256(content).hexdigest()
        if actual_sha != expected_sha:
            raise SecurityError(
                f"Checksum mismatch for {resource_id}!\n"
                f"Expected: {expected_sha}\n"
                f"Got:      {actual_sha}\n"
                f"Possible supply chain attack!"
            )

        return content

Effort: Low

Verdict: YES - Security best practice.


5. Code Generation from Config

What it is: Generate CLI handlers, documentation, and boilerplate from manifest.

Current BLACKICE approach: Hand-written CLI.

Why adopt: Consistency. Change manifest → CLI updates automatically.

Implementation sketch:

def generate_cli_commands(manifest: dict) -> str:
    """Generate Click CLI from manifest."""
    code = [
        "# AUTO-GENERATED - do not edit",
        "import click",
        "",
        "@click.group()",
        "def cli():",
        '    """BLACKICE - Autonomous Coding System"""',
        "    pass",
        "",
    ]

    # Generate command for each agent
    for name, config in manifest["agents"].items():
        code.append(f"@cli.command()")
        code.append(f'@click.option("--prompt", required=True)')
        code.append(f"def {name.replace('-', '_')}(prompt: str):")
        code.append(f'    """Run task using {config["description"]}"""')
        code.append(f'    run_agent("{name}", prompt)')
        code.append("")

    return "\n".join(code)

# Generate: python -m blackice.codegen
# Output: integrations/ralph/cli_generated.py

Effort: Medium - requires build step

Verdict: MAYBE - Nice but not essential.


Ideas NOT Worth Adopting

Ubuntu-Only Installation

Why skip: BLACKICE should remain cross-platform. Docker handles platform abstraction.

Wizard-Style Setup

Why skip: Enterprise users prefer CLI/IaC. Wizard is good for beginners but BLACKICE targets developers.

30+ Tool Installation

Why skip: BLACKICE is focused. Don't bundle unrelated dev tools.


Summary

Feature Worth Adopting? Effort Priority
blackice doctor YES Low High
Manifest-Driven Registry YES Medium High
Idempotent Setup YES Low Medium
SHA256 Verification YES Low Medium
Code Generation MAYBE Medium Low

References


<!-- Source Gist 2 of 19: de7863549ca0366c5fdaa6683f07d595 -->

MassGen Ideas Worth Adopting for BLACKICE

MassGen Ideas Worth Adopting for BLACKICE

Ideas from MassGen that could improve BLACKICE.

What is MassGen?

MassGen is a terminal-based multi-agent scaling system that orchestrates frontier models to collaborate like a "parallel study group."

Aspect MassGen BLACKICE
Focus Parallel reasoning convergence Iterate-until-success with consensus
Pattern Study group (observe & refine) Ralph Loop + voting
Platform Terminal (Python) Python CLI
Model Support 15+ providers Claude, Ollama, Letta
Coordination Notification hub Message broker + consensus
State JSON status files Beads event store

Key Features

  1. Cross-Model Synergy - Different models attack same problem simultaneously
  2. Intelligence Sharing - Agents broadcast observations in real-time
  3. Convergence Detection - Natural consensus without forced agreement
  4. Adaptive Restart - Agents pivot when receiving novel insights
  5. OpenAI-Compatible API - Expose orchestration as /v1/chat/completions

Ideas Worth Adopting

1. Cross-Model Attack Pattern

What it is: Multiple models solve the same problem in parallel, each with different strategies.

Current BLACKICE approach: Sequential model selection via LLMRouter.

Why adopt: Different models have different strengths. Claude is good at architecture, Ollama/Qwen is fast for iteration, GPT-4 catches edge cases.

Implementation sketch:

@dataclass
class ParallelAttack:
    task: Task
    strategies: list[AttackStrategy]

@dataclass
class AttackStrategy:
    model: str
    approach: Literal["tdd", "doc_first", "refactor", "spike"]
    prompt_modifier: str

class CrossModelAttacker:
    """Attack a problem with multiple models simultaneously."""

    async def attack(self, task: Task) -> list[Solution]:
        strategies = [
            AttackStrategy("claude-sonnet-4-20250514", "tdd",
                          "Write tests first, then implement."),
            AttackStrategy("ollama/qwen2.5-coder", "spike",
                          "Quick prototype to explore solution space."),
            AttackStrategy("gpt-4o", "doc_first",
                          "Document the interface, then implement."),
        ]

        # Launch all attacks in parallel
        tasks = [
            self._execute_strategy(task, strategy)
            for strategy in strategies
        ]
        solutions = await asyncio.gather(*tasks)

        # Use existing consensus to pick best
        return await self.consensus.vote(solutions)

Effort: Medium - leverages existing parallel infrastructure

Verdict: YES - Natural extension of current multi-model support.


2. Notification-Based Knowledge Sharing

What it is: Agents publish findings to a shared hub. Other agents can subscribe and react.

Current BLACKICE approach: Direct message broker (request/reply).

Why adopt: Organic knowledge distribution. Agent A finds a bug, Agent B immediately knows.

Implementation sketch:

@dataclass
class Notification:
    agent_id: str
    notification_type: Literal["finding", "blocker", "insight", "partial_solution"]
    content: str
    timestamp: datetime
    relevance_tags: list[str]

class NotificationHub:
    """Pub/sub for agent discoveries."""

    def __init__(self, beads: BeadsClient):
        self.beads = beads
        self.subscribers: dict[str, list[Callable]] = {}

    async def publish(self, notification: Notification):
        # Persist to Beads for replay
        await self.beads.append_event(
            "notification_published",
            notification.__dict__
        )

        # Notify subscribers
        for tag in notification.relevance_tags:
            for callback in self.subscribers.get(tag, []):
                await callback(notification)

    async def subscribe(self, agent_id: str, tags: list[str], callback: Callable):
        for tag in tags:
            self.subscribers.setdefault(tag, []).append(callback)

# Agent usage
async def on_finding(notification: Notification):
    if notification.notification_type == "blocker":
        # Pivot strategy based on peer's blocker
        await self.pivot_strategy(notification.content)

await hub.subscribe("agent-1", ["python", "testing"], on_finding)

Effort: Low-Medium - extends existing message broker

Verdict: YES - More natural than explicit message passing.


3. Convergence Detection (Natural Consensus)

What it is: System detects when agents naturally reach similar conclusions without forced voting.

Current BLACKICE approach: Explicit consensus voting (majority, supermajority, etc.).

Why adopt: Less overhead when agents already agree. Save voting for real disagreements.

Implementation sketch:

@dataclass
class ConvergenceState:
    solutions: list[Solution]
    similarity_matrix: dict[tuple[str, str], float]
    converged: bool
    convergence_score: float

class ConvergenceDetector:
    """Detect natural consensus before forcing vote."""

    def __init__(self, threshold: float = 0.85):
        self.threshold = threshold
        self.embedding_model = "text-embedding-3-small"

    async def check_convergence(self, solutions: list[Solution]) -> ConvergenceState:
        # Embed all solutions
        embeddings = await self._embed_solutions(solutions)

        # Calculate pairwise similarity
        similarity_matrix = {}
        for i, sol_a in enumerate(solutions):
            for j, sol_b in enumerate(solutions[i+1:], i+1):
                similarity = cosine_similarity(embeddings[i], embeddings[j])
                similarity_matrix[(sol_a.agent_id, sol_b.agent_id)] = similarity

        # Check if all pairs above threshold
        avg_similarity = sum(similarity_matrix.values()) / len(similarity_matrix)
        converged = avg_similarity >= self.threshold

        return ConvergenceState(
            solutions=solutions,
            similarity_matrix=similarity_matrix,
            converged=converged,
            convergence_score=avg_similarity
        )

    async def get_consensus(self, solutions: list[Solution]) -> Solution:
        state = await self.check_convergence(solutions)

        if state.converged:
            # Natural consensus - pick any (or merge)
            return await self._merge_similar(solutions)
        else:
            # Fall back to explicit voting
            return await self.consensus_engine.vote(solutions)

Effort: Medium - requires embedding infrastructure

Verdict: YES - More efficient than always voting.


4. OpenAI-Compatible API Wrapper

What it is: Expose entire multi-agent system as standard /v1/chat/completions endpoint.

Current BLACKICE approach: CLI only (ralph run).

Why adopt: Any tool expecting OpenAI API can use BLACKICE. IDE plugins, scripts, other agents.

Implementation sketch:

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class ChatCompletionRequest(BaseModel):
    model: str  # Ignored - uses BLACKICE routing
    messages: list[dict]
    temperature: float = 0.7

class ChatCompletionResponse(BaseModel):
    id: str
    choices: list[dict]
    usage: dict

@app.post("/v1/chat/completions")
async def chat_completion(request: ChatCompletionRequest) -> ChatCompletionResponse:
    # Extract task from messages
    task = extract_task(request.messages)

    # Run through full EnterpriseFlywheel
    result = await flywheel.execute_task(task)

    # Format as OpenAI response
    return ChatCompletionResponse(
        id=result.task_id,
        choices=[{
            "message": {"role": "assistant", "content": result.output},
            "finish_reason": "stop"
        }],
        usage={
            "prompt_tokens": result.metrics.prompt_tokens,
            "completion_tokens": result.metrics.completion_tokens,
            "total_tokens": result.metrics.total_tokens
        }
    )

# Run with: uvicorn blackice.api:app --port 8080

Effort: Low - wrapper around existing CLI

Verdict: YES - Unlocks ecosystem integration.


5. Live Progress Visualization

What it is: Real-time display of agent progress and decision-making.

Current BLACKICE approach: CLI output, logs.

Why adopt: See what's happening during long runs. Debug stuck agents.

Implementation sketch:

# Terminal UI with rich
from rich.live import Live
from rich.table import Table
from rich.console import Console

class LiveDashboard:
    def __init__(self, flywheel: EnterpriseFlywheel):
        self.flywheel = flywheel
        self.console = Console()

    def generate_table(self) -> Table:
        table = Table(title="BLACKICE Agent Status")
        table.add_column("Agent")
        table.add_column("Model")
        table.add_column("Status")
        table.add_column("Iteration")
        table.add_column("Tokens")

        for agent in self.flywheel.active_agents:
            table.add_row(
                agent.id,
                agent.model,
                agent.status,
                str(agent.iteration),
                f"{agent.tokens_used:,}"
            )
        return table

    async def run(self, task: Task):
        with Live(self.generate_table(), refresh_per_second=2) as live:
            async for event in self.flywheel.execute_stream(task):
                live.update(self.generate_table())

Effort: Low - uses existing metrics

Verdict: YES - Essential for debugging.


Ideas NOT Worth Adopting

Proprietary Model Lock-in

Why skip: MassGen's strength is 15+ providers, but BLACKICE intentionally limits to Claude + Ollama + Letta for simplicity and control. Adding more providers adds complexity without clear benefit.

No Persistence Model

Why skip: MassGen uses JSON files. BLACKICE's Beads event store is more robust for crash recovery and audit trails.

Terminal-Only UI

Why skip: Already have CLI. A web dashboard (see Superset gist) would be more valuable than another terminal UI.


Summary

Feature Worth Adopting? Effort Priority
OpenAI-Compatible API YES Low High
Live Progress Visualization YES Low High
Cross-Model Attack YES Medium Medium
Notification Hub YES Low-Medium Medium
Convergence Detection YES Medium Low

References


<!-- Source Gist 3 of 19: d33731cdbc2f13b7eb602cdfc6761e1d -->

Superset Ideas Worth Adopting for BLACKICE

Superset Ideas Worth Adopting for BLACKICE

Ideas from Superset that could improve BLACKICE.

What is Superset?

Superset is a desktop terminal application for managing 10+ parallel CLI coding agents.

Aspect Superset BLACKICE
Focus Terminal UI for parallel agents Backend orchestration
Platform Electron desktop app (macOS) Python CLI
Workspace Isolation Git worktrees Git worktrees (same!)
Agent Support Any CLI agent Claude, Ollama, Letta, Codex
Tech Stack Electron, React, Bun, tRPC Python, SQLite

Key Features

  1. Parallel Agent Management - Run 10+ CLI agents simultaneously
  2. Git Worktree Isolation - Each task gets isolated workspace
  3. Built-in Diff Viewer - Review agent changes visually
  4. Status Monitoring - Notifications when agents complete
  5. Config-Driven Setup - .superset/config.json for automation

Ideas Worth Adopting

1. Desktop UI (Electron)

What it is: Visual desktop app for managing agents instead of CLI-only.

Current BLACKICE approach: CLI only (ralph run, ralph status).

Why adopt: Visual management of 10+ agents is easier than CLI cycling.

Implementation approach:

Option A: Build Electron app (like Superset)
Option B: Build web dashboard (simpler, cross-platform)
Option C: Adopt Superset directly as BLACKICE frontend

Effort: High (new app) or Low (integrate with Superset)

Verdict: MAYBE - Consider integrating with Superset rather than building from scratch.


2. Built-in Diff Viewer

What it is: Visual diff editor for reviewing agent changes before merge.

Current BLACKICE approach: Relies on external tools (git diff, IDE).

Why adopt: Faster review loop → faster iteration.

Implementation sketch:

# Add to CLI
ralph diff <task_id>           # Show diff for task
ralph diff --interactive       # Interactive diff review
ralph diff --accept <task_id>  # Accept changes
ralph diff --reject <task_id>  # Reject changes

# Or web UI
GET /api/tasks/<id>/diff       # Return diff JSON
POST /api/tasks/<id>/accept    # Accept changes

Effort: Medium

Verdict: YES - Improves review workflow.


3. Status Notifications

What it is: Desktop notifications when agents complete tasks.

Current BLACKICE approach: Must poll ralph status manually.

Why adopt: Don't miss completed work while multitasking.

Implementation sketch:

# macOS
import subprocess
def notify(title: str, message: str):
    subprocess.run([
        "osascript", "-e",
        f'display notification "{message}" with title "{title}"'
    ])

# Cross-platform with plyer
from plyer import notification
notification.notify(title="BLACKICE", message="Task completed!")

# Or websocket for web UI
async def broadcast_completion(task_id: str):
    await websocket.send_json({"event": "task_complete", "task_id": task_id})

Effort: Low

Verdict: YES - Easy win.


4. Config-Driven Workspace Setup

What it is: .superset/config.json automates environment setup per project.

Current BLACKICE approach: Manual config via ~/.ralph/config.yaml.

Why adopt: Project-specific configs for different codebases.

Implementation sketch:

# .blackice/config.yaml (per-project)
project:
  name: "my-api"
  default_model: "claude-3-5-sonnet"
  
worktree:
  base_branch: "main"
  prefix: "blackice-"
  
setup:
  pre_task:
    - "npm install"
    - "docker compose up -d"
  post_task:
    - "npm test"

Effort: Low

Verdict: YES - Per-project configs are useful.


Ideas NOT Worth Adopting

Electron Stack

Why skip: BLACKICE is Python-based. Building a full Electron app is overkill when:

  • A web dashboard would work better
  • tmux UI (like Gas Town) is simpler
  • Could integrate with Superset instead of competing

macOS-Only

Why skip: BLACKICE should remain cross-platform.


Summary

Feature Worth Adopting? Effort Priority
Status Notifications YES Low High
Built-in Diff Viewer YES Medium Medium
Per-Project Config YES Low High
Desktop UI MAYBE High Low

Integration Possibility

Instead of building UI from scratch, consider:

  1. BLACKICE as backend → Superset as frontend
  2. Expose BLACKICE via tRPC or REST API
  3. Let Superset manage the visual layer
┌─────────────────────────────────────┐
│           Superset (UI)             │
│   Electron + React + TailwindCSS    │
└─────────────────┬───────────────────┘
                  │ tRPC / REST
                  ▼
┌─────────────────────────────────────┐
│        BLACKICE (Backend)           │
│   EnterpriseFlywheel + Consensus    │
└─────────────────────────────────────┘

References


<!-- Source Gist 4 of 19: 17644e057a159c39b9d50c555cefd418 -->

Gas Town ideas worth adopting for BLACKICE

Gas Town Ideas Worth Adopting for BLACKICE

Ideas from Steve Yegge's Gas Town that could improve BLACKICE.

Background

System BLACKICE Gas Town
Core Pattern Ralph Loop (iterate until success) MEOW (molecular workflows)
Language Python (53K lines) Go (75K lines)
Strength Consensus, observability, multi-LLM Workflow DSL, visual UI, self-healing

1. GUPP - Gastown Universal Propulsion Principle

What it is: "If there is work on your hook, YOU MUST RUN IT."

Every agent has a hook - a persistent pointer to work they must execute on startup. This guarantees continuation across crashes and context window exhaustion.

Current BLACKICE approach: Beads event replay - reconstructs state from event history.

Why adopt: GUPP is simpler. Instead of replaying events, just check the hook and continue.

Implementation sketch:

@dataclass
class AgentHook:
    agent_id: str
    current_task_id: str | None
    current_step: int
    molecule_id: str | None  # workflow chain
    
class EnterpriseFlywheel:
    async def on_agent_start(self, agent_id: str):
        hook = await self.beads.get_hook(agent_id)
        if hook.current_task_id:
            # GUPP: Must run hooked work
            await self.continue_task(hook)

Effort: Medium - adds hook table to Beads, modify agent startup


2. MEOW - Molecular Expression of Work

What it is: Workflow algebra with composable primitives:

  • Beads → atomic work units
  • Epics → beads with children
  • Molecules → chained workflow steps
  • Protomolecules → workflow templates
  • Formulas → TOML source that compiles to molecules
  • Wisps → ephemeral molecules (not persisted to git)

Current BLACKICE approach: DAG executor with hardcoded workflows.

Why adopt: Define workflows as data, not code. Compose, template, reuse.

Example Formula (TOML):

[formula]
name = "feature-implementation"
description = "Standard feature workflow"

[[steps]]
id = "design"
name = "Design the feature"
prompt = "Create a design document for: {feature_description}"

[[steps]]
id = "implement"
name = "Implement the feature"
depends_on = ["design"]
prompt = "Implement based on design: {design.output}"

[[steps]]
id = "test"
name = "Write tests"
depends_on = ["implement"]
prompt = "Write tests for: {implement.files_changed}"

[[steps]]
id = "review"
name = "Code review"
depends_on = ["test"]
prompt = "Review implementation against design"

Implementation sketch:

@dataclass
class MoleculeStep:
    id: str
    name: str
    prompt: str
    depends_on: list[str]
    status: Literal["pending", "running", "done", "failed"]

@dataclass
class Molecule:
    id: str
    formula_name: str
    steps: list[MoleculeStep]
    variables: dict[str, Any]
    
    def next_step(self) -> MoleculeStep | None:
        """Return next runnable step based on dependencies."""
        for step in self.steps:
            if step.status == "pending":
                deps_done = all(
                    self.get_step(d).status == "done" 
                    for d in step.depends_on
                )
                if deps_done:
                    return step
        return None

Effort: High - new subsystem, but very powerful


3. Patrol Agents (Self-Healing)

What it is: Background agents that continuously monitor and fix issues:

  • Witness → monitors workers, unsticks stuck agents
  • Deacon → daemon that propagates "do your job" signals
  • Dogs → helpers that handle maintenance tasks

Current BLACKICE approach: No self-healing. Manual intervention required.

Why adopt: System keeps running without human babysitting.

Implementation sketch:

class PatrolAgent:
    """Background agent that runs a patrol loop."""
    
    async def patrol(self):
        while True:
            # Check system health
            stuck_agents = await self.find_stuck_agents()
            for agent in stuck_agents:
                await self.nudge_agent(agent)
            
            # Check merge queue
            pending_merges = await self.check_merge_queue()
            if pending_merges:
                await self.process_merges(pending_merges)
            
            # Exponential backoff if nothing to do
            await self.sleep_with_backoff()

class WitnessAgent(PatrolAgent):
    """Monitors workers and helps them get unstuck."""
    
    async def find_stuck_agents(self) -> list[Agent]:
        agents = await self.beads.get_active_agents()
        stuck = []
        for agent in agents:
            last_activity = await self.beads.get_last_activity(agent.id)
            if self.is_stuck(last_activity):
                stuck.append(agent)
        return stuck
    
    async def nudge_agent(self, agent: Agent):
        """Send GUPP nudge to stuck agent."""
        await self.send_message(agent.id, "Do your job. Check your hook.")

Effort: Medium - add patrol loop, stuck detection heuristics


4. Convoys (Work Bundling)

What it is: A tracking unit that bundles multiple issues/tasks together for delivery.

Instead of tracking individual tasks, track the convoy - the logical unit of work being delivered.

Current BLACKICE approach: Track individual tasks. No bundling.

Why adopt: Better visibility into "what shipped" vs "what tasks ran."

Implementation sketch:

@dataclass
class Convoy:
    id: str
    name: str
    description: str
    task_ids: list[str]
    status: Literal["active", "landed", "failed"]
    started_at: datetime
    landed_at: datetime | None
    
    @property
    def progress(self) -> float:
        done = sum(1 for t in self.tasks if t.status == "done")
        return done / len(self.tasks) if self.tasks else 0.0

class ConvoyTracker:
    async def create_convoy(self, name: str, task_ids: list[str]) -> Convoy:
        convoy = Convoy(
            id=generate_id(),
            name=name,
            task_ids=task_ids,
            status="active",
            started_at=datetime.now(),
        )
        await self.beads.save_convoy(convoy)
        return convoy
    
    async def check_convoy(self, convoy_id: str) -> Convoy:
        convoy = await self.beads.get_convoy(convoy_id)
        tasks = [await self.beads.get_task(t) for t in convoy.task_ids]
        
        if all(t.status == "done" for t in tasks):
            convoy.status = "landed"
            convoy.landed_at = datetime.now()
            await self.beads.save_convoy(convoy)
        
        return convoy

Effort: Low - simple wrapper around existing task tracking


5. tmux UI (Nice to Have)

What it is: Visual management of 20-30 Claude Code instances in tmux.

Current BLACKICE approach: CLI only.

Why adopt: See all agents at once, switch between them, visual monitoring.

Implementation sketch:

# gt (gas town) style commands for BLACKICE
blackice tmux start          # Start tmux session with agent panes
blackice tmux status         # Show all agents in split view
blackice tmux attach <agent> # Attach to specific agent
blackice tmux broadcast <msg> # Send message to all agents

Effort: Low-Medium - tmux scripting, optional feature


Ideas NOT Worth Adopting

7 Role Model (Mayor, Polecats, etc.)

Why skip: BLACKICE's consensus voting is more flexible. Gas Town's Mayor is a single point of decision-making. Consensus allows multiple agents to vote on solutions, catching more errors.

Nondeterministic Idempotence

Why skip: BLACKICE's Beads event replay is more deterministic and auditable. NDI is "eventually correct" which is fine for vibe coding but not for enterprise use cases.

Claude Code Only

Why skip: BLACKICE's multi-LLM support (Claude, Ollama, Letta, Codex) is a strength. Don't regress to single-provider lock-in.


Priority Order

Priority Feature Effort Impact
1 Convoys Low High - better tracking
2 GUPP Medium High - simpler recovery
3 Patrol Agents Medium High - self-healing
4 MEOW High Very High - workflow DSL
5 tmux UI Low Medium - nice to have

References


<!-- Source Gist 5 of 19: eff6b4d7204aa95d5b18476569c39682 -->

ClaudeBar Ideas for BLACKICE

ClaudeBar Ideas for BLACKICE

Ideas from ClaudeBar for BLACKICE.

What is ClaudeBar?

A macOS menu bar app that monitors AI coding assistant quota usage across multiple providers with clean architecture.

Aspect ClaudeBar BLACKICE
Focus Quota monitoring Iterate-until-success
Platform macOS (SwiftUI) Python CLI
Providers Claude, Codex, Gemini, Copilot, etc. Claude, Ollama, Letta
Architecture Protocol-based DI Adapter pattern

Key Features

  1. Multi-Provider Monitoring - Track all AI tool quotas in one place
  2. Protocol-Based DI - Injectable, testable abstractions
  3. Repository Pattern - Clean data access layer
  4. Chicago School TDD - Test state changes, not method calls
  5. Threshold Alerts - Color-coded health indicators

Ideas Worth Adopting

1. Resource Quota Monitoring

What it is: Track and display usage across all providers.

Current BLACKICE approach: CostTracker tracks tokens but no dashboard.

Why adopt: Know when you're running low. Plan budget.

Implementation sketch:

from dataclasses import dataclass
from enum import Enum

class QuotaStatus(Enum):
    HEALTHY = "healthy"      # >50%
    WARNING = "warning"      # 20-50%
    CRITICAL = "critical"    # <20%
    DEPLETED = "depleted"    # 0%

@dataclass
class ProviderQuota:
    provider: str
    used: int
    limit: int
    unit: str  # "tokens", "requests", "minutes"
    reset_at: datetime | None

    @property
    def remaining(self) -> int:
        return max(0, self.limit - self.used)

    @property
    def percentage(self) -> float:
        if self.limit == 0:
            return 0
        return (self.remaining / self.limit) * 100

    @property
    def status(self) -> QuotaStatus:
        pct = self.percentage
        if pct == 0:
            return QuotaStatus.DEPLETED
        if pct < 20:
            return QuotaStatus.CRITICAL
        if pct < 50:
            return QuotaStatus.WARNING
        return QuotaStatus.HEALTHY

class QuotaMonitor:
    """Monitor quotas across all providers."""

    def __init__(self, providers: list[ProviderProbe]):
        self.providers = {p.name: p for p in providers}
        self._quotas: dict[str, ProviderQuota] = {}

    async def refresh_all(self):
        """Fetch current quotas from all providers."""
        for name, provider in self.providers.items():
            try:
                quota = await provider.get_quota()
                self._quotas[name] = quota
            except Exception as e:
                logger.warning(f"Failed to fetch quota for {name}: {e}")

    def get_status(self) -> dict[str, ProviderQuota]:
        """Get current quota status."""
        return self._quotas.copy()

    def get_summary(self) -> str:
        """Get human-readable summary."""
        lines = ["## Provider Quotas", ""]
        for name, quota in sorted(self._quotas.items()):
            icon = {
                QuotaStatus.HEALTHY: "🟢",
                QuotaStatus.WARNING: "🟡",
                QuotaStatus.CRITICAL: "🔴",
                QuotaStatus.DEPLETED: "⚫",
            }[quota.status]
            lines.append(f"{icon} {name}: {quota.remaining:,}/{quota.limit:,} {quota.unit} ({quota.percentage:.0f}%)")

        return "\n".join(lines)

    def can_use(self, provider: str, amount: int = 1) -> bool:
        """Check if provider has enough quota."""
        quota = self._quotas.get(provider)
        if not quota:
            return True  # Unknown = allow
        return quota.remaining >= amount

# Provider probe interface
class ProviderProbe(Protocol):
    name: str

    async def get_quota(self) -> ProviderQuota:
        """Fetch current quota from provider."""
        ...

# Example: Claude probe
class ClaudeProbe:
    name = "claude"

    async def get_quota(self) -> ProviderQuota:
        # Parse from Claude's usage endpoint
        response = await self._fetch_usage()
        return ProviderQuota(
            provider="claude",
            used=response["tokens_used"],
            limit=response["tokens_limit"],
            unit="tokens",
            reset_at=datetime.fromisoformat(response["reset_at"])
        )

Effort: Medium

Verdict: YES - Essential for budget management.


2. Protocol-Based Dependency Injection

What it is: Define interfaces as protocols, inject implementations.

Current BLACKICE approach: Direct class dependencies.

Why adopt: Testable. Swappable implementations.

Implementation sketch:

from typing import Protocol, runtime_checkable

@runtime_checkable
class TaskStorage(Protocol):
    """Protocol for task persistence."""

    async def save(self, task: Task) -> None: ...
    async def get(self, task_id: str) -> Task | None: ...
    async def list(self, status: str = None) -> list[Task]: ...

@runtime_checkable
class LLMProvider(Protocol):
    """Protocol for LLM interactions."""

    async def generate(self, prompt: str, **kwargs) -> str: ...
    async def get_quota(self) -> ProviderQuota: ...

@runtime_checkable
class EventStore(Protocol):
    """Protocol for event persistence."""

    async def append(self, event: Event) -> None: ...
    async def get_events(self, entity_id: str) -> list[Event]: ...

# Implementations
class SQLiteTaskStorage:
    """SQLite implementation of TaskStorage."""

    def __init__(self, db_path: Path):
        self.db = sqlite3.connect(db_path)

    async def save(self, task: Task) -> None:
        # Implementation
        pass

class InMemoryTaskStorage:
    """In-memory implementation for testing."""

    def __init__(self):
        self._tasks: dict[str, Task] = {}

    async def save(self, task: Task) -> None:
        self._tasks[task.id] = task

    async def get(self, task_id: str) -> Task | None:
        return self._tasks.get(task_id)

# Dependency injection container
@dataclass
class Dependencies:
    task_storage: TaskStorage
    llm_provider: LLMProvider
    event_store: EventStore

def create_production_deps() -> Dependencies:
    return Dependencies(
        task_storage=SQLiteTaskStorage(Path("~/.blackice/tasks.db")),
        llm_provider=ClaudeProvider(),
        event_store=BeadsEventStore(Path("~/.beads/beads.db"))
    )

def create_test_deps() -> Dependencies:
    return Dependencies(
        task_storage=InMemoryTaskStorage(),
        llm_provider=MockLLMProvider(),
        event_store=InMemoryEventStore()
    )

# Usage in flywheel
class EnterpriseFlywheel:
    def __init__(self, deps: Dependencies):
        self.storage = deps.task_storage
        self.llm = deps.llm_provider
        self.events = deps.event_store

Effort: Medium

Verdict: YES - Better testability.


3. Chicago School TDD

What it is: Test observable outcomes, not implementation details.

Current BLACKICE approach: Mix of state and mock-based tests.

Why adopt: Less brittle tests. Focus on behavior.

Implementation sketch:

import pytest

# BAD: Testing implementation details (London School)
class TestFlywheelBad:
    def test_execute_calls_llm(self, mocker):
        # Fragile: breaks if implementation changes
        mock_llm = mocker.patch("blackice.llm.generate")
        flywheel = Flywheel()
        flywheel.execute(task)
        mock_llm.assert_called_once()  # ❌ Testing HOW, not WHAT

# GOOD: Testing observable outcomes (Chicago School)
class TestFlywheelGood:
    async def test_execute_produces_result(self, deps):
        # Robust: tests observable outcome
        flywheel = Flywheel(deps)
        task = Task(id="1", description="Write hello world")

        result = await flywheel.execute(task)

        # ✅ Testing WHAT happened, not HOW
        assert result.status == "success"
        assert "hello" in result.output.lower()
        assert await deps.task_storage.get("1") is not None

    async def test_execute_persists_events(self, deps):
        flywheel = Flywheel(deps)
        task = Task(id="1", description="Write hello world")

        await flywheel.execute(task)

        # ✅ Testing observable state change
        events = await deps.event_store.get_events("1")
        assert len(events) >= 2  # At least start and complete
        assert events[0].type == "task_started"
        assert events[-1].type in ("task_completed", "task_failed")

    async def test_execute_respects_budget(self, deps):
        deps.llm_provider.quota = ProviderQuota(
            provider="test", used=990, limit=1000, unit="tokens"
        )
        flywheel = Flywheel(deps)
        task = Task(id="1", description="Write something long")

        result = await flywheel.execute(task)

        # ✅ Testing observable behavior
        assert result.status == "failed"
        assert "budget" in result.error.lower()

# Test fixtures using dependency injection
@pytest.fixture
def deps():
    return create_test_deps()

@pytest.fixture
def flywheel(deps):
    return Flywheel(deps)

Effort: Low (mindset change)

Verdict: YES - Better tests.


4. Provider Registry Pattern

What it is: Providers self-register capabilities.

Current BLACKICE approach: Hardcoded provider list.

Why adopt: Easy to add new providers. Plugin-friendly.

Implementation sketch:

from typing import Type

class ProviderRegistry:
    """Registry for LLM providers."""

    _providers: dict[str, Type[LLMProvider]] = {}

    @classmethod
    def register(cls, name: str):
        """Decorator to register a provider."""
        def decorator(provider_class: Type[LLMProvider]):
            cls._providers[name] = provider_class
            return provider_class
        return decorator

    @classmethod
    def get(cls, name: str) -> Type[LLMProvider] | None:
        return cls._providers.get(name)

    @classmethod
    def list_all(cls) -> list[str]:
        return list(cls._providers.keys())

    @classmethod
    def create(cls, name: str, **config) -> LLMProvider:
        provider_class = cls._providers.get(name)
        if not provider_class:
            raise ValueError(f"Unknown provider: {name}")
        return provider_class(**config)

# Providers self-register
@ProviderRegistry.register("claude")
class ClaudeProvider:
    def __init__(self, api_key: str = None, model: str = "claude-sonnet-4-20250514"):
        self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
        self.model = model

    async def generate(self, prompt: str, **kwargs) -> str:
        # Implementation
        pass

@ProviderRegistry.register("ollama")
class OllamaProvider:
    def __init__(self, base_url: str = "http://localhost:11434", model: str = "qwen2.5-coder"):
        self.base_url = base_url
        self.model = model

    async def generate(self, prompt: str, **kwargs) -> str:
        # Implementation
        pass

@ProviderRegistry.register("letta")
class LettaProvider:
    def __init__(self, base_url: str = "http://localhost:8283"):
        self.base_url = base_url

    async def generate(self, prompt: str, **kwargs) -> str:
        # Implementation
        pass

# Usage
available = ProviderRegistry.list_all()  # ["claude", "ollama", "letta"]
provider = ProviderRegistry.create("claude", model="claude-opus-4-5")

Effort: Low

Verdict: YES - Clean extensibility.


5. Threshold-Based Alerts

What it is: Color-coded alerts at configurable thresholds.

Current BLACKICE approach: Log warnings only.

Why adopt: Visual status. Proactive alerts.

Implementation sketch:

@dataclass
class AlertThreshold:
    name: str
    operator: Literal["<", ">", "<=", ">=", "=="]
    value: float
    severity: Literal["info", "warning", "critical"]
    message_template: str

DEFAULT_THRESHOLDS = [
    AlertThreshold("quota_warning", "<", 50, "warning", "Quota below 50%: {value:.0f}%"),
    AlertThreshold("quota_critical", "<", 20, "critical", "Quota critical: {value:.0f}%"),
    AlertThreshold("quota_depleted", "==", 0, "critical", "Quota depleted!"),
    AlertThreshold("error_rate_high", ">", 0.3, "warning", "Error rate high: {value:.0%}"),
    AlertThreshold("latency_high", ">", 5000, "warning", "Latency high: {value}ms"),
]

class AlertManager:
    """Manage threshold-based alerts."""

    def __init__(self, thresholds: list[AlertThreshold] = None):
        self.thresholds = thresholds or DEFAULT_THRESHOLDS
        self.active_alerts: dict[str, Alert] = {}

    def check(self, metric: str, value: float) -> list[Alert]:
        """Check metric against thresholds."""
        alerts = []
        for threshold in self.thresholds:
            if not self._matches(threshold, metric):
                continue

            triggered = self._evaluate(threshold, value)
            alert_key = f"{metric}:{threshold.name}"

            if triggered:
                alert = Alert(
                    key=alert_key,
                    severity=threshold.severity,
                    message=threshold.message_template.format(value=value),
                    triggered_at=datetime.now()
                )
                self.active_alerts[alert_key] = alert
                alerts.append(alert)
            elif alert_key in self.active_alerts:
                # Alert resolved
                del self.active_alerts[alert_key]

        return alerts

    def _evaluate(self, threshold: AlertThreshold, value: float) -> bool:
        ops = {
            "<": lambda a, b: a < b,
            ">": lambda a, b: a > b,
            "<=": lambda a, b: a <= b,
            ">=": lambda a, b: a >= b,
            "==": lambda a, b: a == b,
        }
        return ops[threshold.operator](value, threshold.value)

# Integration with monitoring
async def monitoring_loop():
    alert_manager = AlertManager()
    quota_monitor = QuotaMonitor(providers)

    while True:
        await quota_monitor.refresh_all()

        for name, quota in quota_monitor.get_status().items():
            alerts = alert_manager.check(f"{name}_quota", quota.percentage)
            for alert in alerts:
                await notify(alert)  # Desktop notification, webhook, etc.

        await asyncio.sleep(60)

Effort: Low

Verdict: YES - Proactive alerting.


Ideas NOT Worth Adopting

macOS-Only Platform

Why skip: BLACKICE should remain cross-platform.

SwiftUI Implementation

Why skip: BLACKICE is Python.


Summary

Feature Worth Adopting? Effort Priority
Quota Monitoring YES Medium High
Provider Registry YES Low High
Protocol-Based DI YES Medium Medium
Chicago School TDD YES Low Medium
Threshold Alerts YES Low Low

References


<!-- Source Gist 6 of 19: d1e2505a8ecf2bf430156b889c102dd6 -->

Quint-Code Ideas for BLACKICE

Quint Code Ideas for BLACKICE

Ideas from Quint Code for BLACKICE.

What is Quint Code?

Structured reasoning for AI coding tools using the First Principles Framework (FPF). Transforms chaotic AI decision-making into transparent, evidence-backed audit trails.

Aspect Quint Code BLACKICE
Focus Structured reasoning Iterate-until-success
Method FPF (abduction/deduction/induction) Ralph Loop + consensus
State .quint/ directory Beads event store
Output Decision documents Task results

Key Features

  1. Decision Documentation - Every choice preserved with rationale
  2. Hypothesis Scaffolding - Generate competing alternatives before convergence
  3. Evidence Lifecycle - Decay stale evidence, actualize with code changes
  4. Bias Auditing - Calculate confidence scores
  5. Q-Cycle Workflow - Q0 → Q5 structured reasoning phases

Ideas Worth Adopting

1. Q-Cycle Structured Reasoning

What it is: 6-phase reasoning cycle from problem to decision.

Current BLACKICE approach: Ad-hoc reasoning in prompts.

Why adopt: Consistent reasoning process. Better decisions.

Implementation sketch:

from enum import Enum

class QPhase(Enum):
    Q0_INIT = "init"           # Define problem
    Q1_HYPOTHESIZE = "hypothesize"  # Generate alternatives
    Q2_SUPPORT = "support"      # Gather evidence
    Q3_CHALLENGE = "challenge"  # Find counter-evidence
    Q4_AUDIT = "audit"          # Check biases
    Q5_DECIDE = "decide"        # Make decision

@dataclass
class QCycleState:
    phase: QPhase
    problem: str
    hypotheses: list[dict]      # {id, description, confidence}
    evidence: list[dict]        # {id, hypothesis_id, type, content, weight}
    challenges: list[dict]      # {id, hypothesis_id, content}
    audit_results: dict         # {biases_found, confidence_adjustments}
    decision: dict | None       # {hypothesis_id, rationale, confidence}

class QCycleRunner:
    """Run structured Q-Cycle reasoning."""

    def __init__(self, llm: LLMAdapter):
        self.llm = llm

    async def run_cycle(self, problem: str) -> QCycleState:
        """Run complete Q-Cycle."""
        state = QCycleState(
            phase=QPhase.Q0_INIT,
            problem=problem,
            hypotheses=[],
            evidence=[],
            challenges=[],
            audit_results={},
            decision=None
        )

        # Q0: Initialize
        state = await self._q0_init(state)

        # Q1: Generate hypotheses
        state = await self._q1_hypothesize(state)

        # Q2: Gather supporting evidence
        state = await self._q2_support(state)

        # Q3: Find challenges
        state = await self._q3_challenge(state)

        # Q4: Audit for biases
        state = await self._q4_audit(state)

        # Q5: Make decision
        state = await self._q5_decide(state)

        return state

    async def _q1_hypothesize(self, state: QCycleState) -> QCycleState:
        """Generate competing hypotheses."""
        prompt = f"""
Problem: {state.problem}

Generate 3-5 distinct hypotheses/approaches to solve this problem.
For each hypothesis:
- Give it a unique ID (H1, H2, etc.)
- Describe the approach
- Assign initial confidence (0-1)

Format as JSON:
[{{"id": "H1", "description": "...", "confidence": 0.5}}, ...]
"""
        response = await self.llm.generate(prompt)
        state.hypotheses = json.loads(response)
        state.phase = QPhase.Q1_HYPOTHESIZE
        return state

    async def _q4_audit(self, state: QCycleState) -> QCycleState:
        """Audit for cognitive biases."""
        prompt = f"""
Review these hypotheses and evidence for cognitive biases:

Hypotheses:
{json.dumps(state.hypotheses, indent=2)}

Evidence:
{json.dumps(state.evidence, indent=2)}

Challenges:
{json.dumps(state.challenges, indent=2)}

Check for:
- Confirmation bias (favoring evidence that supports preferred hypothesis)
- Anchoring bias (over-weighting first hypothesis)
- Availability bias (favoring easily recalled examples)
- Overconfidence

For each bias found, suggest confidence adjustments.

Format:
{{"biases_found": ["..."], "confidence_adjustments": {{"H1": -0.1, "H2": +0.1}}}}
"""
        response = await self.llm.generate(prompt)
        state.audit_results = json.loads(response)

        # Apply adjustments
        for h in state.hypotheses:
            adj = state.audit_results["confidence_adjustments"].get(h["id"], 0)
            h["confidence"] = max(0, min(1, h["confidence"] + adj))

        state.phase = QPhase.Q4_AUDIT
        return state

Effort: Medium-High

Verdict: YES - More rigorous than ad-hoc reasoning.


2. Evidence Decay

What it is: Old evidence loses weight over time. Stale evidence is marked.

Current BLACKICE approach: All evidence weighted equally.

Why adopt: Codebase changes. Old evidence may be invalid.

Implementation sketch:

from datetime import datetime, timedelta

@dataclass
class Evidence:
    id: str
    content: str
    source: str
    created_at: datetime
    weight: float
    decay_rate: float = 0.1  # Lose 10% weight per week

    @property
    def current_weight(self) -> float:
        """Calculate decayed weight."""
        age = datetime.now() - self.created_at
        weeks = age.total_seconds() / (7 * 24 * 3600)
        decay_factor = (1 - self.decay_rate) ** weeks
        return self.weight * decay_factor

    @property
    def is_stale(self) -> bool:
        """Check if evidence is too old to be reliable."""
        return self.current_weight < 0.2

class EvidenceManager:
    """Manage evidence with decay."""

    def __init__(self, db_path: Path):
        self.db = sqlite3.connect(db_path)
        self._init_schema()

    def add(self, evidence: Evidence):
        """Add new evidence."""
        self.db.execute("""
            INSERT INTO evidence (id, content, source, created_at, weight, decay_rate)
            VALUES (?, ?, ?, ?, ?, ?)
        """, (evidence.id, evidence.content, evidence.source,
              evidence.created_at.isoformat(), evidence.weight, evidence.decay_rate))
        self.db.commit()

    def get_valid(self, hypothesis_id: str) -> list[Evidence]:
        """Get non-stale evidence for hypothesis."""
        cursor = self.db.execute("""
            SELECT * FROM evidence
            WHERE hypothesis_id = ? AND current_weight > 0.2
            ORDER BY current_weight DESC
        """, (hypothesis_id,))

        return [Evidence(**row) for row in cursor.fetchall()]

    def mark_stale(self, evidence_id: str, reason: str):
        """Manually mark evidence as stale."""
        self.db.execute("""
            UPDATE evidence
            SET weight = 0, stale_reason = ?
            WHERE id = ?
        """, (reason, evidence_id))
        self.db.commit()

    def refresh(self, evidence_id: str, new_content: str):
        """Refresh evidence with new information."""
        self.db.execute("""
            UPDATE evidence
            SET content = ?, created_at = ?, weight = 1.0
            WHERE id = ?
        """, (new_content, datetime.now().isoformat(), evidence_id))
        self.db.commit()

# Commands for evidence management
# /q-decay   - Show stale evidence
# /q-refresh - Refresh evidence from current code

Effort: Medium

Verdict: YES - Realistic evidence handling.


3. Confidence Scoring

What it is: Numerical confidence on decisions with explicit calculation.

Current BLACKICE approach: Binary pass/fail.

Why adopt: Weight agent proposals in consensus. Detect overconfidence.

Implementation sketch:

@dataclass
class ConfidenceBreakdown:
    base_confidence: float        # From hypothesis generation
    evidence_support: float       # +/- from supporting evidence
    evidence_challenge: float     # +/- from challenging evidence
    bias_adjustment: float        # From audit
    historical_accuracy: float    # Past accuracy on similar decisions
    final_confidence: float

class ConfidenceCalculator:
    """Calculate and explain confidence scores."""

    def __init__(self, history_db: Path):
        self.history = HistoricalAccuracy(history_db)

    def calculate(
        self,
        hypothesis: dict,
        supporting: list[Evidence],
        challenging: list[Evidence],
        bias_adjustment: float = 0
    ) -> ConfidenceBreakdown:
        """Calculate confidence with full breakdown."""

        base = hypothesis["confidence"]

        # Evidence support
        support_weight = sum(e.current_weight for e in supporting)
        evidence_support = min(0.3, support_weight * 0.1)

        # Evidence challenges
        challenge_weight = sum(e.current_weight for e in challenging)
        evidence_challenge = -min(0.3, challenge_weight * 0.1)

        # Historical accuracy
        similar_decisions = self.history.find_similar(hypothesis["description"])
        if similar_decisions:
            historical = sum(d.was_correct for d in similar_decisions) / len(similar_decisions)
            historical_adjustment = (historical - 0.5) * 0.2  # +/- 0.1 max
        else:
            historical_adjustment = 0

        final = base + evidence_support + evidence_challenge + bias_adjustment + historical_adjustment
        final = max(0, min(1, final))  # Clamp to [0, 1]

        return ConfidenceBreakdown(
            base_confidence=base,
            evidence_support=evidence_support,
            evidence_challenge=evidence_challenge,
            bias_adjustment=bias_adjustment,
            historical_accuracy=historical_adjustment,
            final_confidence=final
        )

    def explain(self, breakdown: ConfidenceBreakdown) -> str:
        """Human-readable confidence explanation."""
        return f"""
Confidence: {breakdown.final_confidence:.0%}

Breakdown:
- Base confidence: {breakdown.base_confidence:.0%}
- Supporting evidence: {breakdown.evidence_support:+.0%}
- Challenging evidence: {breakdown.evidence_challenge:+.0%}
- Bias adjustment: {breakdown.bias_adjustment:+.0%}
- Historical accuracy: {breakdown.historical_accuracy:+.0%}
"""

Effort: Medium

Verdict: YES - Explicit confidence is useful.


4. Decision Documents

What it is: Every decision preserved with full rationale.

Current BLACKICE approach: Decisions in Beads events (less structured).

Why adopt: Audit trail. Learn from past decisions. Debug bad choices.

Implementation sketch:

@dataclass
class Decision:
    id: str
    task_id: str
    timestamp: datetime
    problem: str
    chosen_hypothesis: str
    alternatives_considered: list[str]
    rationale: str
    confidence: ConfidenceBreakdown
    evidence_used: list[str]
    outcome: Literal["pending", "success", "failure"] = "pending"
    outcome_notes: str | None = None

class DecisionStore:
    """Store and retrieve decision documents."""

    def __init__(self, base_path: Path):
        self.base_path = base_path / ".quint" / "decisions"
        self.base_path.mkdir(parents=True, exist_ok=True)

    def save(self, decision: Decision):
        """Save decision document."""
        path = self.base_path / f"{decision.id}.md"

        content = f"""# Decision: {decision.id}

## Problem
{decision.problem}

## Chosen Approach
{decision.chosen_hypothesis}

## Alternatives Considered
{chr(10).join(f"- {a}" for a in decision.alternatives_considered)}

## Rationale
{decision.rationale}

## Confidence
{self._format_confidence(decision.confidence)}

## Evidence Used
{chr(10).join(f"- {e}" for e in decision.evidence_used)}

## Outcome
Status: {decision.outcome}
{decision.outcome_notes or ""}

---
Timestamp: {decision.timestamp.isoformat()}
Task: {decision.task_id}
"""
        path.write_text(content)

    def record_outcome(self, decision_id: str, outcome: str, notes: str):
        """Record outcome for learning."""
        # Update decision document
        # Also update historical accuracy database
        pass

    def find_similar(self, problem: str) -> list[Decision]:
        """Find past decisions on similar problems."""
        # Search through decision documents
        # Return relevant past decisions
        pass

Effort: Medium

Verdict: YES - Better than unstructured events.


Ideas NOT Worth Adopting

MCP Server Architecture

Why skip: BLACKICE has its own architecture.

Slash Command Interface

Why skip: BLACKICE has its own CLI design.


Summary

Feature Worth Adopting? Effort Priority
Q-Cycle Structured Reasoning YES Medium High
Confidence Scoring YES Medium High
Decision Documents YES Medium Medium
Evidence Decay YES Medium Low

References


<!-- Source Gist 7 of 19: 6a08ce38cb1dd646e0bce1e405e9c709 -->

Gentleman-Guardian-Angel Ideas for BLACKICE

Gentleman Guardian Angel Ideas for BLACKICE

Ideas from Gentleman Guardian Angel for BLACKICE.

What is Gentleman Guardian Angel?

A provider-agnostic AI code review tool that runs as a git pre-commit hook, validating staged files against project standards.

Aspect Guardian Angel BLACKICE
Focus Pre-commit code review Iterate-until-success
Integration Git hooks CLI
Providers Claude, Gemini, Ollama, any CLI Claude, Ollama, Letta
Dependencies Pure Bash Python

Key Features

  1. Provider Agnostic - Works with any CLI-based AI
  2. Git-Native - Standard pre-commit/commit-msg hooks
  3. File Pattern Matching - Include/exclude specific file types
  4. Intelligent Caching - Skip unchanged files
  5. Zero Dependencies - Pure Bash implementation

Ideas Worth Adopting

1. Git Hook Integration

What it is: Run AI review as part of git workflow automatically.

Current BLACKICE approach: Manual invocation only.

Why adopt: Enforce quality at commit time. No forgotten reviews.

Implementation sketch:

# blackice-hooks/pre-commit
#!/usr/bin/env python3
"""Pre-commit hook for BLACKICE code review."""

import subprocess
import sys
from pathlib import Path

def get_staged_files() -> list[Path]:
    """Get list of staged files."""
    result = subprocess.run(
        ["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
        capture_output=True, text=True
    )
    return [Path(f) for f in result.stdout.strip().split("\n") if f]

def should_review(file: Path, patterns: list[str]) -> bool:
    """Check if file matches review patterns."""
    for pattern in patterns:
        if file.match(pattern):
            return True
    return False

def run_review(files: list[Path]) -> tuple[bool, str]:
    """Run BLACKICE review on files."""
    from blackice import QuickReviewer

    reviewer = QuickReviewer()
    results = []

    for file in files:
        result = reviewer.review(file)
        results.append((file, result))

    # Check for blocking issues
    blocking = [r for f, r in results if r.severity == "error"]
    if blocking:
        return False, format_issues(blocking)

    return True, ""

def main():
    files = get_staged_files()
    patterns = ["*.py", "*.ts", "*.js", "*.tsx", "*.jsx"]

    reviewable = [f for f in files if should_review(f, patterns)]
    if not reviewable:
        sys.exit(0)

    print(f"🔍 Reviewing {len(reviewable)} files...")
    passed, message = run_review(reviewable)

    if not passed:
        print(f"❌ Review failed:\n{message}")
        print("\nFix issues or use --no-verify to skip")
        sys.exit(1)

    print("✅ Review passed")
    sys.exit(0)

if __name__ == "__main__":
    main()
# Installation script
# blackice hooks install
#!/bin/bash

HOOK_DIR=".git/hooks"
PRE_COMMIT="$HOOK_DIR/pre-commit"

cat > "$PRE_COMMIT" << 'EOF'
#!/bin/bash
python3 -m blackice.hooks.pre_commit
EOF

chmod +x "$PRE_COMMIT"
echo "✅ Pre-commit hook installed"

Effort: Low

Verdict: YES - Automatic quality enforcement.


2. Content-Addressable Caching

What it is: Hash-based cache that skips unchanged files.

Current BLACKICE approach: No review caching.

Why adopt: Don't re-review unchanged files. Faster commits.

Implementation sketch:

import hashlib
from pathlib import Path

@dataclass
class CacheEntry:
    file_hash: str
    rules_hash: str
    result: str
    timestamp: datetime

class ReviewCache:
    """Content-addressable cache for code reviews."""

    def __init__(self, cache_dir: Path = None):
        self.cache_dir = cache_dir or Path.home() / ".cache" / "blackice" / "reviews"
        self.cache_dir.mkdir(parents=True, exist_ok=True)

    def _hash_file(self, file: Path) -> str:
        """Hash file contents."""
        return hashlib.sha256(file.read_bytes()).hexdigest()

    def _hash_rules(self, rules_file: Path) -> str:
        """Hash rules file to detect rule changes."""
        if not rules_file.exists():
            return "default"
        return hashlib.sha256(rules_file.read_bytes()).hexdigest()[:16]

    def _cache_key(self, file: Path, rules_hash: str) -> str:
        """Generate cache key from file hash + rules hash."""
        file_hash = self._hash_file(file)
        return f"{file_hash[:16]}_{rules_hash}"

    def get(self, file: Path, rules_file: Path) -> str | None:
        """Get cached review result if valid."""
        rules_hash = self._hash_rules(rules_file)
        key = self._cache_key(file, rules_hash)
        cache_file = self.cache_dir / f"{key}.json"

        if cache_file.exists():
            entry = CacheEntry(**json.loads(cache_file.read_text()))
            # Verify hashes still match
            if entry.file_hash == self._hash_file(file):
                return entry.result
            # Cache invalidated by content change
            cache_file.unlink()

        return None

    def set(self, file: Path, rules_file: Path, result: str):
        """Cache review result."""
        rules_hash = self._hash_rules(rules_file)
        key = self._cache_key(file, rules_hash)

        entry = CacheEntry(
            file_hash=self._hash_file(file),
            rules_hash=rules_hash,
            result=result,
            timestamp=datetime.now()
        )

        cache_file = self.cache_dir / f"{key}.json"
        cache_file.write_text(json.dumps(entry.__dict__, default=str))

    def invalidate_all(self):
        """Clear entire cache (e.g., when rules change)."""
        for f in self.cache_dir.glob("*.json"):
            f.unlink()

# Usage in reviewer
cache = ReviewCache()

for file in files_to_review:
    cached = cache.get(file, rules_file)
    if cached:
        print(f"⏭️  {file} (cached)")
        continue

    result = await review_file(file)
    cache.set(file, rules_file, result)
    print(f"✅ {file} reviewed")

Effort: Low

Verdict: YES - Faster reviews.


3. External Rules File

What it is: Project standards defined in a separate file, not hardcoded.

Current BLACKICE approach: Prompts embedded in code.

Why adopt: Easy to update rules without code changes. Version controlled.

Implementation sketch:

<!-- AGENTS.md - Project coding standards -->
# Code Review Standards

## Required Patterns
- All functions must have docstrings
- Type hints required for function parameters and returns
- Maximum function length: 50 lines
- Maximum file length: 500 lines

## Forbidden Patterns
- No `print()` statements in production code
- No hardcoded credentials or API keys
- No `TODO` comments older than 30 days
- No unused imports

## Style Guidelines
- Use f-strings over .format() or %
- Prefer list comprehensions over map/filter
- Use pathlib over os.path
- Snake_case for functions, PascalCase for classes

## Security Requirements
- Sanitize all user input
- Use parameterized queries for SQL
- Validate file paths to prevent traversal
- No shell=True in subprocess calls

## Test Requirements
- All public functions must have tests
- Minimum coverage: 80%
- Use pytest, not unittest
class RulesLoader:
    """Load rules from external file."""

    DEFAULT_PATH = Path("AGENTS.md")

    def load(self, path: Path = None) -> str:
        """Load rules file as prompt context."""
        path = path or self.DEFAULT_PATH

        if not path.exists():
            return self._default_rules()

        content = path.read_text()
        return self._parse_rules(content)

    def _parse_rules(self, content: str) -> str:
        """Parse markdown rules into structured prompt."""
        # Keep as markdown - LLMs understand it well
        return f"""
You are a code reviewer. Apply these project-specific standards:

{content}

Review the following code and identify any violations of these standards.
Format your response as:
- ❌ VIOLATION: <description> (line X)
- ⚠️ WARNING: <description>
- ✅ PASS if no issues found
"""

    def _default_rules(self) -> str:
        """Default rules if no file exists."""
        return """
You are a code reviewer. Check for:
- Code quality and readability
- Potential bugs or errors
- Security issues
- Performance problems

Be constructive but thorough.
"""

Effort: Low

Verdict: YES - Configurable rules.


4. Strict Mode for CI

What it is: Fail CI on ambiguous AI responses.

Current BLACKICE approach: Trust AI output.

Why adopt: Don't let unclear reviews pass. Human must resolve ambiguity.

Implementation sketch:

@dataclass
class ReviewResult:
    status: Literal["pass", "fail", "ambiguous"]
    issues: list[str]
    raw_response: str

class StrictModeReviewer:
    """Reviewer with strict mode for CI."""

    PASS_INDICATORS = ["✅", "PASS", "no issues", "looks good", "approved"]
    FAIL_INDICATORS = ["❌", "FAIL", "violation", "error", "must fix"]
    AMBIGUOUS_INDICATORS = ["might", "could", "consider", "possibly", "unclear"]

    def __init__(self, strict: bool = False):
        self.strict = strict

    def parse_result(self, response: str) -> ReviewResult:
        """Parse AI response into structured result."""
        response_lower = response.lower()

        # Check for clear pass
        if any(ind.lower() in response_lower for ind in self.PASS_INDICATORS):
            has_fail = any(ind.lower() in response_lower for ind in self.FAIL_INDICATORS)
            if not has_fail:
                return ReviewResult("pass", [], response)

        # Check for clear fail
        if any(ind.lower() in response_lower for ind in self.FAIL_INDICATORS):
            issues = self._extract_issues(response)
            return ReviewResult("fail", issues, response)

        # Check for ambiguous
        if self.strict and any(ind in response_lower for ind in self.AMBIGUOUS_INDICATORS):
            return ReviewResult("ambiguous", ["Response was ambiguous"], response)

        # Default based on mode
        if self.strict:
            return ReviewResult("ambiguous", ["Could not determine result"], response)
        return ReviewResult("pass", [], response)

    def _extract_issues(self, response: str) -> list[str]:
        """Extract issue descriptions from response."""
        issues = []
        for line in response.split("\n"):
            if any(ind in line for ind in ["❌", "VIOLATION", "ERROR"]):
                issues.append(line.strip())
        return issues

# CI usage
reviewer = StrictModeReviewer(strict=True)
result = reviewer.parse_result(ai_response)

if result.status == "ambiguous":
    print("⚠️ Review result was ambiguous. Manual review required.")
    print(f"Raw response:\n{result.raw_response}")
    sys.exit(1)

Effort: Low

Verdict: YES - Safer for CI pipelines.


5. Zero-Dependency Design

What it is: Pure Bash implementation with no runtime dependencies.

Current BLACKICE approach: Python with many dependencies.

Why adopt: Works anywhere. No installation friction.

BUT: This is a design choice, not a feature. BLACKICE is Python.

Verdict: NO - Python is fine. Don't rewrite in Bash.


Ideas NOT Worth Adopting

Pure Bash Implementation

Why skip: BLACKICE is Python-native. Bash limits functionality.

Single-File Design

Why skip: BLACKICE needs proper package structure.


Summary

Feature Worth Adopting? Effort Priority
Git Hook Integration YES Low High
Content-Addressable Cache YES Low High
External Rules File YES Low Medium
Strict Mode for CI YES Low Medium
Zero Dependencies NO - -

References


<!-- Source Gist 8 of 19: 3fe6e9c14fbaab1a04ac6c04e9b12cc8 -->

Auto-Claude Ideas for BLACKICE

Auto-Claude Ideas for BLACKICE

Ideas from Auto-Claude for BLACKICE.

What is Auto-Claude?

An autonomous multi-agent AI coding framework that orchestrates planning, implementation, QA, and deployment without continuous human intervention.

Aspect Auto-Claude BLACKICE
Focus Autonomous end-to-end Iterate-until-success
Isolation Git worktrees Worktree pool
Parallelism Up to 12 terminals Worker pool
QA Built-in validation loop Consensus voting
License AGPL-3.0 MIT

Key Features

  1. Multi-Layer Agent System - Planning, implementation, QA, merge agents
  2. Dynamic Command Allowlisting - Stack-aware command restrictions
  3. Three-Layer Security Sandbox - OS, filesystem, command filtering
  4. Self-Validating QA Loop - Catches issues before human review
  5. Memory Persistence - Insights retained across sessions

Ideas Worth Adopting

1. Dynamic Command Allowlisting

What it is: Detect project stack and restrict commands to those relevant.

Current BLACKICE approach: Static command restrictions.

Why adopt: Python projects shouldn't run npm. Node projects shouldn't run pip. Reduce attack surface.

Implementation sketch:

from pathlib import Path

@dataclass
class StackProfile:
    name: str
    indicators: list[str]  # Files that indicate this stack
    allowed_commands: list[str]
    package_managers: list[str]
    test_commands: list[str]
    build_commands: list[str]

STACK_PROFILES = [
    StackProfile(
        name="python",
        indicators=["pyproject.toml", "setup.py", "requirements.txt", "Pipfile"],
        allowed_commands=["python", "python3", "pip", "uv", "pytest", "ruff", "mypy"],
        package_managers=["pip", "uv", "pipenv", "poetry"],
        test_commands=["pytest", "python -m pytest", "python -m unittest"],
        build_commands=["python -m build", "pip wheel"]
    ),
    StackProfile(
        name="node",
        indicators=["package.json", "yarn.lock", "pnpm-lock.yaml"],
        allowed_commands=["node", "npm", "npx", "yarn", "pnpm", "bun", "tsx"],
        package_managers=["npm", "yarn", "pnpm", "bun"],
        test_commands=["npm test", "yarn test", "jest", "vitest"],
        build_commands=["npm run build", "yarn build"]
    ),
    StackProfile(
        name="rust",
        indicators=["Cargo.toml"],
        allowed_commands=["cargo", "rustc", "rustup", "rustfmt", "clippy"],
        package_managers=["cargo"],
        test_commands=["cargo test"],
        build_commands=["cargo build"]
    ),
    StackProfile(
        name="go",
        indicators=["go.mod", "go.sum"],
        allowed_commands=["go", "gofmt", "golint"],
        package_managers=["go mod"],
        test_commands=["go test"],
        build_commands=["go build"]
    ),
]

class StackDetector:
    """Detect project stack from files."""

    def detect(self, project_root: Path) -> list[StackProfile]:
        """Detect all stacks in project."""
        detected = []
        for profile in STACK_PROFILES:
            for indicator in profile.indicators:
                if (project_root / indicator).exists():
                    detected.append(profile)
                    break
        return detected

class DynamicAllowlist:
    """Restrict commands based on detected stack."""

    def __init__(self, project_root: Path):
        self.detector = StackDetector()
        self.stacks = self.detector.detect(project_root)
        self.allowed = self._build_allowlist()

    def _build_allowlist(self) -> set[str]:
        """Build combined allowlist from all detected stacks."""
        allowed = {"git", "ls", "cat", "grep", "find", "mkdir", "cp", "mv"}  # Always allowed
        for stack in self.stacks:
            allowed.update(stack.allowed_commands)
        return allowed

    def is_allowed(self, command: str) -> bool:
        """Check if command is allowed for this project."""
        # Extract base command
        parts = command.split()
        if not parts:
            return False

        base_cmd = parts[0]
        return base_cmd in self.allowed

    def get_test_command(self) -> str | None:
        """Get appropriate test command for stack."""
        if self.stacks:
            return self.stacks[0].test_commands[0]
        return None

Effort: Medium

Verdict: YES - Smarter command restrictions.


2. Self-Validating QA Loop

What it is: Automatically test generated code before flagging for human review.

Current BLACKICE approach: Consensus votes on correctness.

Why adopt: Don't waste human time on broken code. Catch issues early.

Implementation sketch:

@dataclass
class QAResult:
    passed: bool
    build_status: bool
    test_status: bool
    lint_status: bool
    coverage: float
    issues: list[str]

class SelfValidatingQA:
    """Automatic QA before human review."""

    def __init__(self, project_root: Path, allowlist: DynamicAllowlist):
        self.root = project_root
        self.allowlist = allowlist

    async def validate(self, changes: list[Path]) -> QAResult:
        """Run full QA pipeline on changes."""
        issues = []

        # 1. Lint check
        lint_result = await self._run_lint(changes)
        if not lint_result.passed:
            issues.extend(lint_result.issues)

        # 2. Type check (if applicable)
        type_result = await self._run_typecheck(changes)
        if not type_result.passed:
            issues.extend(type_result.issues)

        # 3. Build check
        build_result = await self._run_build()
        if not build_result.passed:
            issues.extend(build_result.issues)
            # Don't proceed to tests if build fails
            return QAResult(
                passed=False,
                build_status=False,
                test_status=False,
                lint_status=lint_result.passed,
                coverage=0,
                issues=issues
            )

        # 4. Test run
        test_result = await self._run_tests()
        if not test_result.passed:
            issues.extend(test_result.issues)

        # 5. Coverage check
        coverage = await self._get_coverage()

        return QAResult(
            passed=len(issues) == 0,
            build_status=build_result.passed,
            test_status=test_result.passed,
            lint_status=lint_result.passed,
            coverage=coverage,
            issues=issues
        )

    async def _run_lint(self, files: list[Path]):
        """Run linter on changed files."""
        if "ruff" in self.allowlist.allowed:
            cmd = f"ruff check {' '.join(str(f) for f in files)}"
        elif "eslint" in self.allowlist.allowed:
            cmd = f"eslint {' '.join(str(f) for f in files)}"
        else:
            return LintResult(passed=True, issues=[])

        result = await self._exec(cmd)
        return self._parse_lint_output(result)

    async def fix_and_retry(self, qa_result: QAResult, agent) -> QAResult:
        """Have agent fix issues and re-run QA."""
        if qa_result.passed:
            return qa_result

        # Give agent the issues to fix
        fix_prompt = f"""
The following QA issues were found:

{chr(10).join(f"- {issue}" for issue in qa_result.issues)}

Please fix these issues. Do not change functionality, only fix the issues.
"""
        await agent.run(fix_prompt)

        # Re-run QA
        return await self.validate(qa_result.changed_files)

Effort: Medium

Verdict: YES - Reduces human review burden.


3. Memory Persistence Across Sessions

What it is: Agents remember codebase insights across sessions.

Current BLACKICE approach: Beads stores events, but not structured insights.

Why adopt: Don't re-learn the same things. Faster subsequent runs.

Implementation sketch:

@dataclass
class CodebaseInsight:
    category: str  # "pattern", "antipattern", "preference", "constraint"
    description: str
    confidence: float
    source: str  # Where this was learned
    created_at: datetime
    last_used: datetime

class InsightMemory:
    """Persistent memory of codebase insights."""

    def __init__(self, db_path: Path):
        self.db = sqlite3.connect(db_path)
        self._init_schema()

    def _init_schema(self):
        self.db.execute("""
            CREATE TABLE IF NOT EXISTS insights (
                id TEXT PRIMARY KEY,
                category TEXT,
                description TEXT,
                confidence REAL,
                source TEXT,
                created_at TEXT,
                last_used TEXT,
                use_count INTEGER DEFAULT 0
            )
        """)

    def add_insight(self, insight: CodebaseInsight):
        """Add or update insight."""
        self.db.execute("""
            INSERT INTO insights (id, category, description, confidence, source, created_at, last_used)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(id) DO UPDATE SET
                confidence = (confidence + excluded.confidence) / 2,
                last_used = excluded.last_used,
                use_count = use_count + 1
        """, (
            self._hash(insight.description),
            insight.category,
            insight.description,
            insight.confidence,
            insight.source,
            insight.created_at.isoformat(),
            insight.last_used.isoformat()
        ))
        self.db.commit()

    def get_relevant(self, context: str, limit: int = 10) -> list[CodebaseInsight]:
        """Get insights relevant to current context."""
        # Simple keyword matching (could use embeddings)
        keywords = set(context.lower().split())

        cursor = self.db.execute("""
            SELECT * FROM insights
            ORDER BY confidence DESC, use_count DESC
            LIMIT ?
        """, (limit * 3,))  # Over-fetch, then filter

        results = []
        for row in cursor.fetchall():
            desc_keywords = set(row["description"].lower().split())
            if keywords & desc_keywords:  # Any overlap
                results.append(CodebaseInsight(**row))

        return results[:limit]

    def format_for_prompt(self, insights: list[CodebaseInsight]) -> str:
        """Format insights for agent prompt."""
        if not insights:
            return ""

        lines = ["## Codebase Insights (from previous sessions)", ""]
        for i in insights:
            lines.append(f"- **{i.category}**: {i.description} (confidence: {i.confidence:.0%})")

        return "\n".join(lines)

# Auto-learn insights from agent conversations
class InsightExtractor:
    """Extract insights from agent outputs."""

    INSIGHT_PATTERNS = [
        (r"I noticed that this codebase (.+)", "pattern"),
        (r"This project (?:prefers|uses) (.+)", "preference"),
        (r"Avoid (.+) because (.+)", "antipattern"),
        (r"This codebase requires (.+)", "constraint"),
    ]

    def extract(self, agent_output: str) -> list[CodebaseInsight]:
        insights = []
        for pattern, category in self.INSIGHT_PATTERNS:
            matches = re.findall(pattern, agent_output, re.IGNORECASE)
            for match in matches:
                insights.append(CodebaseInsight(
                    category=category,
                    description=match if isinstance(match, str) else " ".join(match),
                    confidence=0.7,  # Initial confidence
                    source="agent_output",
                    created_at=datetime.now(),
                    last_used=datetime.now()
                ))
        return insights

Effort: Medium

Verdict: YES - Learn and remember.


4. Three-Layer Security Sandbox

What it is: OS isolation + filesystem restrictions + command filtering.

Current BLACKICE approach: Command filtering only.

Why adopt: Defense in depth. Multiple layers of protection.

Implementation sketch:

import os
import tempfile
from pathlib import Path

class SecuritySandbox:
    """Three-layer security sandbox for agent execution."""

    def __init__(self, project_root: Path, allowlist: DynamicAllowlist):
        self.root = project_root.resolve()
        self.allowlist = allowlist
        self.allowed_paths = self._build_allowed_paths()

    def _build_allowed_paths(self) -> set[Path]:
        """Build set of paths agent can access."""
        allowed = {
            self.root,
            Path(tempfile.gettempdir()),
            Path.home() / ".cache",
        }
        # Add all subdirectories of project
        for p in self.root.rglob("*"):
            if p.is_dir():
                allowed.add(p)
        return allowed

    # Layer 1: Filesystem restrictions
    def check_path(self, path: str | Path) -> bool:
        """Check if path is within allowed boundaries."""
        try:
            resolved = Path(path).resolve()

            # Never allow system paths
            forbidden = ["/etc", "/usr", "/bin", "/sbin", "/var", "/root"]
            if any(str(resolved).startswith(f) for f in forbidden):
                return False

            # Check against allowed paths
            for allowed in self.allowed_paths:
                try:
                    resolved.relative_to(allowed)
                    return True
                except ValueError:
                    continue

            return False
        except Exception:
            return False

    # Layer 2: Command filtering
    def check_command(self, command: str) -> tuple[bool, str | None]:
        """Check if command is allowed."""
        # First check allowlist
        if not self.allowlist.is_allowed(command):
            return False, f"Command not allowed for this project stack"

        # Then check for dangerous patterns
        dangerous = [
            (r"rm\s+-rf\s+/", "Recursive delete of root"),
            (r">\s*/etc/", "Write to /etc"),
            (r"chmod\s+777", "Insecure permissions"),
            (r"\|\s*sh", "Pipe to shell"),
        ]
        for pattern, reason in dangerous:
            if re.search(pattern, command):
                return False, reason

        return True, None

    # Layer 3: Environment isolation
    def get_safe_env(self) -> dict:
        """Get sanitized environment for subprocess."""
        safe_env = {}
        allowed_vars = [
            "PATH", "HOME", "USER", "LANG", "LC_ALL",
            "PYTHONPATH", "NODE_PATH", "GOPATH",
            "TERM", "SHELL",
        ]
        for var in allowed_vars:
            if var in os.environ:
                safe_env[var] = os.environ[var]

        # Remove potentially dangerous vars
        safe_env.pop("LD_PRELOAD", None)
        safe_env.pop("LD_LIBRARY_PATH", None)

        return safe_env

    async def execute(self, command: str) -> tuple[bool, str]:
        """Execute command within sandbox."""
        # Check command
        allowed, reason = self.check_command(command)
        if not allowed:
            return False, f"Blocked: {reason}"

        # Execute with restrictions
        try:
            result = await asyncio.create_subprocess_shell(
                command,
                cwd=self.root,
                env=self.get_safe_env(),
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            stdout, stderr = await result.communicate()
            return result.returncode == 0, stdout.decode() + stderr.decode()
        except Exception as e:
            return False, str(e)

Effort: Medium

Verdict: YES - Essential for production.


Ideas NOT Worth Adopting

AGPL License Model

Why skip: Too restrictive for some use cases. BLACKICE is MIT.

Electron Desktop App

Why skip: CLI is more flexible. Desktop app is unnecessary.


Summary

Feature Worth Adopting? Effort Priority
Dynamic Command Allowlisting YES Medium High
Self-Validating QA Loop YES Medium High
Memory Persistence YES Medium Medium
Three-Layer Sandbox YES Medium Medium

References


<!-- Source Gist 9 of 19: ea58818ae51813ac3f0f821dd7f77cc0 -->

Continuous-Claude-v2 Ideas for BLACKICE

Continuous Claude v2 Ideas for BLACKICE

Ideas from Continuous Claude v2 for BLACKICE.

What is Continuous Claude v2?

A lossless state preservation system that maintains project continuity across sessions through ledgers, handoffs, and artifact indexing.

Aspect Continuous Claude BLACKICE
Focus Session continuity Iterate-until-success
Memory Ledgers + Handoffs Beads event store
Learning Artifact index + Braintrust Reflexion
Agents Plan → Validate → Implement Supervisor + Workers

Key Features

  1. Continuity Ledger - Lossless session state snapshots
  2. Handoff System - Structured session transfer documents
  3. Artifact Index - SQLite+FTS5 searchable database
  4. 10 Hook Types - Lifecycle event interception
  5. TDD Workflow - Test-first implementation

Ideas Worth Adopting

1. Continuity Ledger

What it is: Lossless state snapshots instead of lossy compaction.

Current BLACKICE approach: Beads events (similar, but less structured).

Why adopt: Explicit ledger format is easier to read/debug than event replay.

Implementation sketch:

@dataclass
class LedgerEntry:
    timestamp: datetime
    phase: str
    status: Literal["started", "completed", "blocked", "failed"]
    decision: str | None
    rationale: str | None
    artifacts: list[str]
    learnings: list[str]

@dataclass
class ContinuityLedger:
    task_id: str
    tech_stack: dict
    phases: list[LedgerEntry]
    decisions: list[dict]
    learnings: list[str]
    rules_generated: list[str]

class LedgerManager:
    """Manage continuity ledgers."""

    def __init__(self, base_path: Path):
        self.base_path = base_path

    def get_ledger_path(self, task_id: str) -> Path:
        return self.base_path / "thoughts" / "ledgers" / f"CONTINUITY_{task_id}.md"

    def load(self, task_id: str) -> ContinuityLedger | None:
        path = self.get_ledger_path(task_id)
        if not path.exists():
            return None
        return self._parse_ledger(path.read_text())

    def save(self, ledger: ContinuityLedger):
        path = self.get_ledger_path(ledger.task_id)
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(self._format_ledger(ledger))

    def append_entry(self, task_id: str, entry: LedgerEntry):
        ledger = self.load(task_id) or ContinuityLedger(task_id, {}, [], [], [], [])
        ledger.phases.append(entry)
        self.save(ledger)

    def _format_ledger(self, ledger: ContinuityLedger) -> str:
        lines = [
            f"# Continuity Ledger: {ledger.task_id}",
            "",
            "## Tech Stack",
            yaml.dump(ledger.tech_stack),
            "",
            "## Phases",
        ]

        for entry in ledger.phases:
            lines.append(f"\n### {entry.phase} ({entry.status})")
            lines.append(f"- **Time**: {entry.timestamp}")
            if entry.decision:
                lines.append(f"- **Decision**: {entry.decision}")
            if entry.rationale:
                lines.append(f"- **Rationale**: {entry.rationale}")
            if entry.learnings:
                lines.append("- **Learnings**:")
                for l in entry.learnings:
                    lines.append(f"  - {l}")

        lines.extend([
            "",
            "## Accumulated Learnings",
            *[f"- {l}" for l in ledger.learnings],
            "",
            "## Generated Rules",
            *[f"- {r}" for r in ledger.rules_generated],
        ])

        return "\n".join(lines)

Effort: Medium

Verdict: YES - Better than raw event streams.


2. Handoff System

What it is: Structured documents enabling agent-to-agent or session-to-session context transfer.

Current BLACKICE approach: No explicit handoff mechanism.

Why adopt: Clean context transfer. Multi-agent coordination.

Implementation sketch:

@dataclass
class Handoff:
    id: str
    from_agent: str
    to_agent: str | None  # None = next session
    created_at: datetime

    # Context
    task_summary: str
    current_phase: str
    completed_work: list[str]
    remaining_work: list[str]

    # State
    key_decisions: list[dict]
    open_questions: list[str]
    blockers: list[str]

    # Artifacts
    artifacts_created: list[str]
    files_modified: list[str]

    # Learnings
    what_worked: list[str]
    what_didnt: list[str]
    recommendations: list[str]

class HandoffManager:
    """Manage agent handoffs."""

    def __init__(self, base_path: Path):
        self.base_path = base_path / "thoughts" / "shared" / "handoffs"
        self.base_path.mkdir(parents=True, exist_ok=True)

    def create_handoff(self, agent_id: str, task: Task, state: dict) -> Handoff:
        """Create handoff from current agent state."""
        handoff = Handoff(
            id=f"handoff-{uuid4().hex[:8]}",
            from_agent=agent_id,
            to_agent=None,
            created_at=datetime.now(),
            task_summary=task.description,
            current_phase=state.get("phase", "unknown"),
            completed_work=state.get("completed", []),
            remaining_work=state.get("remaining", []),
            key_decisions=state.get("decisions", []),
            open_questions=state.get("questions", []),
            blockers=state.get("blockers", []),
            artifacts_created=state.get("artifacts", []),
            files_modified=state.get("files", []),
            what_worked=state.get("worked", []),
            what_didnt=state.get("failed", []),
            recommendations=state.get("recommendations", [])
        )

        self._save(handoff)
        return handoff

    def get_latest(self, task_id: str) -> Handoff | None:
        """Get most recent handoff for task."""
        pattern = f"*{task_id}*.md"
        handoffs = sorted(self.base_path.glob(pattern), key=lambda p: p.stat().st_mtime)
        if not handoffs:
            return None
        return self._load(handoffs[-1])

    def to_prompt(self, handoff: Handoff) -> str:
        """Convert handoff to agent prompt."""
        return f"""
## Handoff from Previous Session

### Task Summary
{handoff.task_summary}

### Current Phase
{handoff.current_phase}

### Completed Work
{chr(10).join(f"- {w}" for w in handoff.completed_work)}

### Remaining Work
{chr(10).join(f"- {w}" for w in handoff.remaining_work)}

### Key Decisions Made
{chr(10).join(f"- {d['decision']}: {d['rationale']}" for d in handoff.key_decisions)}

### Open Questions
{chr(10).join(f"- {q}" for q in handoff.open_questions)}

### Recommendations
{chr(10).join(f"- {r}" for r in handoff.recommendations)}

---
Continue from where the previous session left off.
"""

Effort: Medium

Verdict: YES - Essential for multi-agent coordination.


3. Artifact Index (SQLite+FTS5)

What it is: Searchable database of agent outputs, decisions, and patterns.

Current BLACKICE approach: Beads stores events but limited search.

Why adopt: Fast full-text search across all historical artifacts.

Implementation sketch:

import sqlite3

class ArtifactIndex:
    """SQLite+FTS5 searchable artifact index."""

    def __init__(self, db_path: Path):
        self.conn = sqlite3.connect(db_path)
        self._init_schema()

    def _init_schema(self):
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS artifacts (
                id TEXT PRIMARY KEY,
                task_id TEXT,
                type TEXT,
                title TEXT,
                content TEXT,
                created_at TIMESTAMP,
                metadata JSON
            );

            CREATE VIRTUAL TABLE IF NOT EXISTS artifacts_fts USING fts5(
                title, content, task_id,
                content='artifacts',
                content_rowid='rowid'
            );

            CREATE TRIGGER IF NOT EXISTS artifacts_ai AFTER INSERT ON artifacts BEGIN
                INSERT INTO artifacts_fts(rowid, title, content, task_id)
                VALUES (new.rowid, new.title, new.content, new.task_id);
            END;
        """)

    def add(self, artifact: dict):
        """Add artifact to index."""
        self.conn.execute("""
            INSERT INTO artifacts (id, task_id, type, title, content, created_at, metadata)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            artifact["id"],
            artifact["task_id"],
            artifact["type"],
            artifact["title"],
            artifact["content"],
            datetime.now().isoformat(),
            json.dumps(artifact.get("metadata", {}))
        ))
        self.conn.commit()

    def search(self, query: str, limit: int = 10) -> list[dict]:
        """Full-text search across artifacts."""
        cursor = self.conn.execute("""
            SELECT a.*, highlight(artifacts_fts, 1, '<mark>', '</mark>') as snippet
            FROM artifacts a
            JOIN artifacts_fts f ON a.rowid = f.rowid
            WHERE artifacts_fts MATCH ?
            ORDER BY rank
            LIMIT ?
        """, (query, limit))

        return [dict(row) for row in cursor.fetchall()]

    def get_by_type(self, artifact_type: str, limit: int = 50) -> list[dict]:
        """Get artifacts by type."""
        cursor = self.conn.execute("""
            SELECT * FROM artifacts
            WHERE type = ?
            ORDER BY created_at DESC
            LIMIT ?
        """, (artifact_type, limit))

        return [dict(row) for row in cursor.fetchall()]

    def find_similar_decisions(self, query: str) -> list[dict]:
        """Find past decisions similar to current situation."""
        return self.search(f"type:decision {query}")

# Usage
index = ArtifactIndex(Path(".agent/artifact-index.db"))

# Index a decision
index.add({
    "id": "dec-123",
    "task_id": "task-456",
    "type": "decision",
    "title": "Use PostgreSQL over SQLite",
    "content": "Chose PostgreSQL for production due to concurrent write requirements...",
    "metadata": {"confidence": 0.9}
})

# Search later
similar = index.search("database choice concurrent writes")

Effort: Medium

Verdict: YES - Searchable history is powerful.


4. Validation Funnel

What it is: Pre-implementation checks against precedent and best practices.

Current BLACKICE approach: Execute then check.

Why adopt: Catch issues before wasting execution time.

Implementation sketch:

@dataclass
class ValidationResult:
    passed: bool
    checks: list[tuple[str, bool, str]]  # (check_name, passed, message)
    blockers: list[str]
    warnings: list[str]

class ValidationFunnel:
    """Pre-implementation validation pipeline."""

    def __init__(self, artifact_index: ArtifactIndex, web_searcher):
        self.index = artifact_index
        self.web = web_searcher

    async def validate(self, plan: Plan) -> ValidationResult:
        """Run all validation checks."""
        checks = []
        blockers = []
        warnings = []

        # Check 1: Precedent (RAG-judge)
        precedent_check = await self._check_precedent(plan)
        checks.append(("precedent", precedent_check.passed, precedent_check.message))
        if not precedent_check.passed:
            warnings.append(precedent_check.message)

        # Check 2: Best practices (web search)
        practices_check = await self._check_best_practices(plan)
        checks.append(("best_practices", practices_check.passed, practices_check.message))
        if not practices_check.passed:
            warnings.append(practices_check.message)

        # Check 3: Tech stack compatibility
        stack_check = self._check_stack_compatibility(plan)
        checks.append(("stack", stack_check.passed, stack_check.message))
        if not stack_check.passed:
            blockers.append(stack_check.message)

        # Check 4: Resource constraints
        resource_check = self._check_resources(plan)
        checks.append(("resources", resource_check.passed, resource_check.message))
        if not resource_check.passed:
            blockers.append(resource_check.message)

        return ValidationResult(
            passed=len(blockers) == 0,
            checks=checks,
            blockers=blockers,
            warnings=warnings
        )

    async def _check_precedent(self, plan: Plan):
        """Check if similar approaches succeeded before."""
        similar = self.index.search(plan.summary, limit=5)
        if not similar:
            return Check(True, "No precedent found (novel approach)")

        successes = [s for s in similar if s["metadata"].get("outcome") == "success"]
        if len(successes) >= 3:
            return Check(True, f"Found {len(successes)} successful precedents")
        return Check(False, f"Only {len(successes)}/5 similar attempts succeeded")

    async def _check_best_practices(self, plan: Plan):
        """Search for best practices and compare."""
        results = await self.web.search(f"{plan.tech_stack} best practices {plan.domain}")
        # LLM comparison of plan vs best practices
        return Check(True, "Aligns with best practices")

Effort: Medium-High

Verdict: YES - Prevent issues before execution.


5. Reasoning History Capture

What it is: Store extended thinking per checkpoint for later recall.

Current BLACKICE approach: Only store outputs, not reasoning.

Why adopt: Recall WHY decisions were made. Debug bad choices.

Implementation sketch:

@dataclass
class ReasoningCapture:
    checkpoint_id: str
    timestamp: datetime
    prompt: str
    thinking: str  # Extended thinking/chain-of-thought
    decision: str
    confidence: float
    alternatives_considered: list[str]

class ReasoningStore:
    """Store and retrieve reasoning history."""

    def __init__(self, base_path: Path):
        self.base_path = base_path / ".git" / "claude" / "reasoning"
        self.base_path.mkdir(parents=True, exist_ok=True)

    def capture(self, commit_hash: str, reasoning: ReasoningCapture):
        """Store reasoning for a commit."""
        path = self.base_path / commit_hash / "reasoning.md"
        path.parent.mkdir(parents=True, exist_ok=True)

        content = f"""# Reasoning for {commit_hash}

## Timestamp
{reasoning.timestamp.isoformat()}

## Prompt
{reasoning.prompt}

## Thinking Process
{reasoning.thinking}

## Decision
{reasoning.decision}

## Confidence
{reasoning.confidence:.0%}

## Alternatives Considered
{chr(10).join(f"- {a}" for a in reasoning.alternatives_considered)}
"""
        path.write_text(content)

    def recall(self, query: str) -> list[ReasoningCapture]:
        """Find past reasoning related to query."""
        # Search through stored reasoning
        results = []
        for path in self.base_path.rglob("reasoning.md"):
            content = path.read_text()
            if query.lower() in content.lower():
                results.append(self._parse(content))
        return results

Effort: Low-Medium

Verdict: YES - Debuggable decision history.


Ideas NOT Worth Adopting

Braintrust Integration

Why skip: External dependency. BLACKICE's Beads is sufficient.

RepoPrompt Dependency

Why skip: Paid tool. Use open alternatives.


Summary

Feature Worth Adopting? Effort Priority
Handoff System YES Medium High
Continuity Ledger YES Medium High
Artifact Index (FTS5) YES Medium Medium
Validation Funnel YES Medium Medium
Reasoning History YES Low Low

References


<!-- Source Gist 10 of 19: 0eb96e20ff00f58bce4b0a99c4abe06c -->

Claude-Code-Safety-Net Ideas for BLACKICE

Claude Code Safety Net Ideas for BLACKICE

Ideas from Claude Code Safety Net for BLACKICE.

What is Claude Code Safety Net?

A plugin that prevents AI agents from executing destructive commands by intercepting bash operations before execution.

Aspect Safety Net BLACKICE
Focus Prevent destructive commands Iterate-until-success
Method Semantic command analysis SafetyGuard policies
Scope User + Project config Global config
Analysis Parse flags, unwrap shells Pattern matching

Key Features

  1. Semantic Command Analysis - Parses arguments, understands flag combinations
  2. Shell Wrapper Detection - Recursively analyzes bash -c, sh -c
  3. Dual-Scope Config - User-level + project-level rules
  4. Fail-Safe Defaults - Falls back to built-in protections on config errors
  5. Paranoid Mode - Extra strict restrictions

Ideas Worth Adopting

1. Semantic Command Analysis

What it is: Parse flags and understand dangerous combinations, not just prefixes.

Current BLACKICE approach: Simple pattern matching.

Why adopt: git checkout -b is safe, git checkout -- is dangerous. Need to understand context.

Implementation sketch:

import shlex
from dataclasses import dataclass

@dataclass
class ParsedCommand:
    executable: str
    subcommand: str | None
    flags: list[str]
    args: list[str]
    raw: str

class CommandParser:
    """Parse commands into structured format."""

    def parse(self, command: str) -> ParsedCommand:
        parts = shlex.split(command)
        if not parts:
            return ParsedCommand("", None, [], [], command)

        executable = parts[0]
        flags = [p for p in parts[1:] if p.startswith("-")]
        args = [p for p in parts[1:] if not p.startswith("-")]
        subcommand = args[0] if args and not args[0].startswith("/") else None

        return ParsedCommand(
            executable=executable,
            subcommand=subcommand,
            flags=flags,
            args=args,
            raw=command
        )

@dataclass
class DangerRule:
    executable: str
    subcommand: str | None
    dangerous_flags: list[str]
    safe_flags: list[str]  # These make it safe even with dangerous flags
    reason: str

DANGER_RULES = [
    DangerRule(
        executable="git",
        subcommand="checkout",
        dangerous_flags=["--"],
        safe_flags=["-b", "-B"],  # Creating branch is safe
        reason="Discards uncommitted changes"
    ),
    DangerRule(
        executable="git",
        subcommand="reset",
        dangerous_flags=["--hard"],
        safe_flags=[],
        reason="Destroys uncommitted work"
    ),
    DangerRule(
        executable="git",
        subcommand="push",
        dangerous_flags=["--force", "-f"],
        safe_flags=["--force-with-lease"],  # Safer variant
        reason="Rewrites remote history"
    ),
    DangerRule(
        executable="rm",
        subcommand=None,
        dangerous_flags=["-rf", "-r", "-f"],
        safe_flags=[],
        reason="Permanent deletion"
    ),
]

class SemanticAnalyzer:
    """Analyze commands semantically."""

    def __init__(self, rules: list[DangerRule]):
        self.rules = rules
        self.parser = CommandParser()

    def analyze(self, command: str) -> tuple[bool, str | None]:
        """Returns (is_safe, reason if unsafe)."""
        parsed = self.parser.parse(command)

        for rule in self.rules:
            if parsed.executable != rule.executable:
                continue

            if rule.subcommand and parsed.subcommand != rule.subcommand:
                continue

            # Check for safe flags first
            if any(sf in parsed.flags for sf in rule.safe_flags):
                continue

            # Check for dangerous flags
            if any(df in parsed.flags for df in rule.dangerous_flags):
                return False, rule.reason

        return True, None

Effort: Medium

Verdict: YES - Much safer than regex matching.


2. Shell Wrapper Detection

What it is: Recursively unwrap bash -c, sh -c, python -c to analyze hidden commands.

Current BLACKICE approach: Analyze surface command only.

Why adopt: Agents can hide dangerous commands in shell wrappers.

Implementation sketch:

class ShellUnwrapper:
    """Recursively unwrap shell commands."""

    SHELL_WRAPPERS = {
        "bash": ["-c"],
        "sh": ["-c"],
        "zsh": ["-c"],
        "python": ["-c"],
        "python3": ["-c"],
        "node": ["-e"],
        "perl": ["-e"],
    }

    def unwrap(self, command: str) -> list[str]:
        """Extract all nested commands."""
        commands = [command]
        parts = shlex.split(command)

        if len(parts) < 2:
            return commands

        executable = parts[0]
        if executable not in self.SHELL_WRAPPERS:
            return commands

        # Check for shell execution flags
        for i, part in enumerate(parts[1:], 1):
            if part in self.SHELL_WRAPPERS[executable]:
                # Next part is the command
                if i + 1 < len(parts):
                    inner_command = parts[i + 1]
                    # Recursively unwrap
                    commands.extend(self.unwrap(inner_command))
                break

        return commands

    def analyze_all(self, command: str, analyzer: SemanticAnalyzer) -> tuple[bool, str | None]:
        """Analyze command and all nested commands."""
        all_commands = self.unwrap(command)

        for cmd in all_commands:
            is_safe, reason = analyzer.analyze(cmd)
            if not is_safe:
                return False, f"Nested command '{cmd}': {reason}"

        return True, None

# Example
unwrapper = ShellUnwrapper()

# This will detect the dangerous rm inside bash -c
command = 'bash -c "rm -rf /important/data"'
commands = unwrapper.unwrap(command)
# Returns: ['bash -c "rm -rf /important/data"', 'rm -rf /important/data']

Effort: Low-Medium

Verdict: YES - Critical for security.


3. Dual-Scope Configuration

What it is: User-level defaults + project-level overrides.

Current BLACKICE approach: Global config only.

Why adopt: Different projects have different safety needs.

Implementation sketch:

from pathlib import Path

@dataclass
class SafetyConfig:
    blocked_commands: list[DangerRule]
    allowed_paths: list[str]  # Safe to delete in these paths
    strict_mode: bool
    paranoid_mode: bool

class ConfigLoader:
    """Load safety config from multiple scopes."""

    USER_CONFIG = Path("~/.cc-safety-net/config.json").expanduser()
    PROJECT_CONFIG = Path(".safety-net.json")

    def load(self) -> SafetyConfig:
        """Load and merge configs (project wins on conflicts)."""
        user_config = self._load_file(self.USER_CONFIG)
        project_config = self._load_file(self.PROJECT_CONFIG)

        return self._merge(user_config, project_config)

    def _load_file(self, path: Path) -> dict:
        if not path.exists():
            return {}
        try:
            return json.loads(path.read_text())
        except json.JSONDecodeError:
            # Fail safe: return empty, don't crash
            return {}

    def _merge(self, user: dict, project: dict) -> SafetyConfig:
        """Project config overrides user config."""
        merged = {**user, **project}

        # Special handling: blocked_commands are additive
        blocked = user.get("blocked_commands", []) + project.get("blocked_commands", [])
        merged["blocked_commands"] = self._dedupe_rules(blocked)

        return SafetyConfig(**merged)

# Project-specific config example
# .safety-net.json
{
    "allowed_paths": ["/tmp", "./build", "./dist"],
    "blocked_commands": [
        {
            "executable": "docker",
            "subcommand": "system prune",
            "reason": "Don't clean Docker in this project"
        }
    ],
    "strict_mode": true
}

Effort: Low

Verdict: YES - Flexible, safe defaults.


4. Fail-Safe Defaults

What it is: If config is malformed, fall back to built-in protections.

Current BLACKICE approach: Crash on bad config.

Why adopt: Safety should never be compromised by config errors.

Implementation sketch:

DEFAULT_BLOCKED = [
    DangerRule("rm", None, ["-rf"], [], "Permanent deletion"),
    DangerRule("git", "push", ["--force"], ["--force-with-lease"], "Force push"),
    DangerRule("git", "reset", ["--hard"], [], "Hard reset"),
    DangerRule("git", "clean", ["-f"], [], "Clean untracked"),
    DangerRule("chmod", None, ["777"], [], "Insecure permissions"),
    DangerRule("curl", None, ["|", "bash"], [], "Pipe to shell"),
]

class SafetyGuard:
    """Guard with fail-safe defaults."""

    def __init__(self, config_path: Path = None):
        self.config = self._load_config_safely(config_path)

    def _load_config_safely(self, path: Path) -> SafetyConfig:
        """Load config, fall back to defaults on any error."""
        try:
            if path and path.exists():
                data = json.loads(path.read_text())
                return SafetyConfig(**data)
        except Exception as e:
            # Log but don't crash
            logger.warning(f"Config error, using defaults: {e}")

        # Return safe defaults
        return SafetyConfig(
            blocked_commands=DEFAULT_BLOCKED,
            allowed_paths=["/tmp", "/var/tmp"],
            strict_mode=False,
            paranoid_mode=False
        )

    def check(self, command: str) -> tuple[bool, str | None]:
        """Check if command is safe. Always returns valid result."""
        try:
            return self._analyze(command)
        except Exception as e:
            # On any analysis error, block the command
            logger.error(f"Analysis error, blocking: {e}")
            return False, "Analysis failed - blocked for safety"

Effort: Low

Verdict: YES - Defense in depth.


5. Paranoid Mode

What it is: Extra-strict mode that blocks even slightly risky operations.

Current BLACKICE approach: Single strictness level.

Why adopt: High-security environments need extra protection.

Implementation sketch:

class ParanoidGuard(SafetyGuard):
    """Extra-strict safety guard."""

    PARANOID_RULES = [
        # Block ALL interpreter one-liners
        DangerRule("python", None, ["-c"], [], "Interpreter execution"),
        DangerRule("node", None, ["-e"], [], "Interpreter execution"),
        DangerRule("perl", None, ["-e"], [], "Interpreter execution"),

        # Block network operations
        DangerRule("curl", None, [], [], "Network fetch"),
        DangerRule("wget", None, [], [], "Network fetch"),

        # Block any rm (not just -rf)
        DangerRule("rm", None, [], [], "Any deletion"),

        # Block sudo entirely
        DangerRule("sudo", None, [], [], "Elevated privileges"),
    ]

    def __init__(self, config_path: Path = None):
        super().__init__(config_path)
        if self.config.paranoid_mode:
            self.config.blocked_commands.extend(self.PARANOID_RULES)

# Usage
guard = ParanoidGuard()  # Enable with config: {"paranoid_mode": true}

Effort: Low

Verdict: YES - Options for high-security environments.


Ideas NOT Worth Adopting

Claude-Specific Hook System

Why skip: BLACKICE should remain model-agnostic.


Summary

Feature Worth Adopting? Effort Priority
Semantic Command Analysis YES Medium High
Shell Wrapper Detection YES Low High
Fail-Safe Defaults YES Low High
Dual-Scope Configuration YES Low Medium
Paranoid Mode YES Low Low

References


<!-- Source Gist 11 of 19: bdf398007302c18632c1784c8e092ac3 -->

Claude-Workflow-v2 Ideas for BLACKICE

Claude Workflow v2 Ideas for BLACKICE

Ideas from Claude Workflow v2 for BLACKICE.

What is Claude Workflow v2?

A comprehensive Claude Code plugin with 7 specialized agents, 17 commands, 6 skills, and 9 hooks for intelligent software development workflows.

Aspect Claude Workflow v2 BLACKICE
Focus Agent orchestration via plugins Iterate-until-success
Agents 7 specialized (reviewer, debugger, etc.) Supervisor + Workers
Config Markdown files Python/YAML
Hooks 9 types (security, formatting, etc.) Limited hooks

Key Features

  1. 7 Specialized Agents - Orchestrator, code-reviewer, debugger, docs-writer, security-auditor, refactorer, test-architect
  2. Proactive Agent Spawning - Context-triggered activation
  3. Multi-Step Commands - /commit-push-pr chains operations
  4. Skill-Based Knowledge - External files provide domain guidance
  5. Cascading Verification - Parallel sub-agent validation

Ideas Worth Adopting

1. Proactive Agent Spawning

What it is: Agents activate based on context, not explicit commands.

Current BLACKICE approach: Explicit agent selection.

Why adopt: Friction-free workflows. Agent selection becomes automatic.

Implementation sketch:

@dataclass
class ActivationTrigger:
    agent_type: str
    patterns: list[str]  # Regex patterns
    keywords: list[str]
    context_requirements: list[str]  # e.g., "has_test_files"

ACTIVATION_TRIGGERS = [
    ActivationTrigger(
        agent_type="code_reviewer",
        patterns=[r"review\s+(?:this|the|my)\s+code", r"check\s+for\s+issues"],
        keywords=["review", "audit", "check quality"],
        context_requirements=[]
    ),
    ActivationTrigger(
        agent_type="security_auditor",
        patterns=[r"security\s+(?:check|audit|scan)", r"vulnerabilit"],
        keywords=["security", "vulnerability", "cve"],
        context_requirements=[]
    ),
    ActivationTrigger(
        agent_type="test_architect",
        patterns=[r"write\s+tests?", r"add\s+(?:unit\s+)?tests?"],
        keywords=["test", "coverage", "tdd"],
        context_requirements=["has_source_files"]
    ),
    ActivationTrigger(
        agent_type="refactorer",
        patterns=[r"refactor", r"clean\s*up", r"restructure"],
        keywords=["refactor", "cleanup", "improve"],
        context_requirements=["has_source_files"]
    ),
]

class ProactiveSpawner:
    """Spawn agents based on context."""

    def __init__(self, triggers: list[ActivationTrigger]):
        self.triggers = triggers

    def detect_agent(self, user_message: str, context: dict) -> str | None:
        """Detect which agent should handle this request."""
        message_lower = user_message.lower()

        for trigger in self.triggers:
            # Check keywords
            if any(kw in message_lower for kw in trigger.keywords):
                if self._check_context(trigger, context):
                    return trigger.agent_type

            # Check patterns
            for pattern in trigger.patterns:
                if re.search(pattern, message_lower):
                    if self._check_context(trigger, context):
                        return trigger.agent_type

        return None  # Use default agent

    def _check_context(self, trigger: ActivationTrigger, context: dict) -> bool:
        for req in trigger.context_requirements:
            if not context.get(req, False):
                return False
        return True

# Usage
spawner = ProactiveSpawner(ACTIVATION_TRIGGERS)
agent_type = spawner.detect_agent(user_message, {"has_source_files": True})
if agent_type:
    agent = spawn_agent(agent_type)

Effort: Low-Medium

Verdict: YES - Better UX than manual selection.


2. Multi-Step Command Chains

What it is: Single command triggers multiple sequential operations.

Current BLACKICE approach: Individual commands.

Why adopt: Common workflows in one command. Less friction.

Implementation sketch:

@dataclass
class CommandStep:
    name: str
    command: str
    args: dict
    on_failure: Literal["abort", "continue", "skip"]

@dataclass
class CommandChain:
    name: str
    description: str
    steps: list[CommandStep]

COMMAND_CHAINS = {
    "commit-push-pr": CommandChain(
        name="commit-push-pr",
        description="Stage, commit, push, and create PR",
        steps=[
            CommandStep("stage", "git add", {"files": "."}, "abort"),
            CommandStep("commit", "git commit", {"message": "{message}"}, "abort"),
            CommandStep("push", "git push", {"branch": "{branch}"}, "abort"),
            CommandStep("pr", "gh pr create", {"title": "{title}"}, "continue"),
        ]
    ),
    "test-fix-commit": CommandChain(
        name="test-fix-commit",
        description="Run tests, fix failures, commit fixes",
        steps=[
            CommandStep("test", "pytest", {}, "continue"),
            CommandStep("fix", "agent:fix_failures", {}, "abort"),
            CommandStep("retest", "pytest", {}, "abort"),
            CommandStep("commit", "git commit", {"message": "fix: test failures"}, "continue"),
        ]
    ),
    "review-merge": CommandChain(
        name="review-merge",
        description="Review PR and merge if approved",
        steps=[
            CommandStep("checkout", "git checkout", {"pr": "{pr_number}"}, "abort"),
            CommandStep("review", "agent:code_review", {}, "abort"),
            CommandStep("approve", "gh pr review --approve", {}, "abort"),
            CommandStep("merge", "gh pr merge", {}, "abort"),
        ]
    ),
}

class ChainExecutor:
    """Execute command chains."""

    async def execute(self, chain_name: str, params: dict) -> ChainResult:
        chain = COMMAND_CHAINS[chain_name]
        results = []

        for step in chain.steps:
            # Substitute parameters
            args = {k: v.format(**params) if isinstance(v, str) else v
                    for k, v in step.args.items()}

            try:
                if step.command.startswith("agent:"):
                    result = await self._run_agent(step.command[6:], args)
                else:
                    result = await self._run_command(step.command, args)

                results.append((step.name, "success", result))

            except Exception as e:
                results.append((step.name, "failed", str(e)))

                match step.on_failure:
                    case "abort":
                        return ChainResult(status="aborted", step=step.name, results=results)
                    case "skip":
                        continue
                    case "continue":
                        continue

        return ChainResult(status="success", results=results)

Effort: Low

Verdict: YES - Workflow efficiency.


3. Cascading Verification

What it is: Spawn parallel sub-agents for verification (build, test, lint, security).

Current BLACKICE approach: Sequential verification.

Why adopt: Faster verification. Independent failure detection.

Implementation sketch:

@dataclass
class Verifier:
    name: str
    agent_type: str
    timeout: float
    critical: bool  # If True, failure blocks merge

VERIFIERS = [
    Verifier("build", "build_validator", 300, True),
    Verifier("test", "test_runner", 600, True),
    Verifier("lint", "lint_checker", 60, False),
    Verifier("security", "security_scanner", 120, True),
    Verifier("types", "type_checker", 120, False),
]

class CascadingVerifier:
    """Run multiple verifiers in parallel."""

    async def verify_all(self, changes: list[Path]) -> VerificationReport:
        """Run all verifiers in parallel."""
        tasks = [
            self._run_verifier(v, changes)
            for v in VERIFIERS
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        report = VerificationReport()
        for verifier, result in zip(VERIFIERS, results):
            if isinstance(result, Exception):
                report.add_failure(verifier.name, str(result), verifier.critical)
            elif not result.passed:
                report.add_failure(verifier.name, result.message, verifier.critical)
            else:
                report.add_success(verifier.name, result.message)

        return report

    async def _run_verifier(self, verifier: Verifier, changes: list[Path]):
        """Run single verifier with timeout."""
        agent = spawn_agent(verifier.agent_type)
        try:
            return await asyncio.wait_for(
                agent.verify(changes),
                timeout=verifier.timeout
            )
        except asyncio.TimeoutError:
            return VerifierResult(passed=False, message=f"Timeout after {verifier.timeout}s")

@dataclass
class VerificationReport:
    successes: list[tuple[str, str]] = field(default_factory=list)
    failures: list[tuple[str, str, bool]] = field(default_factory=list)

    @property
    def can_proceed(self) -> bool:
        """True if no critical failures."""
        return not any(critical for _, _, critical in self.failures)

    def format_summary(self) -> str:
        lines = ["## Verification Report", ""]
        for name, msg in self.successes:
            lines.append(f"✅ {name}: {msg}")
        for name, msg, critical in self.failures:
            marker = "❌" if critical else "⚠️"
            lines.append(f"{marker} {name}: {msg}")
        return "\n".join(lines)

Effort: Medium

Verdict: YES - Parallel verification is faster.


4. Tool Permission Scoping

What it is: Commands declare which tools they're allowed to use.

Current BLACKICE approach: All-or-nothing access.

Why adopt: Principle of least privilege. Safer execution.

Implementation sketch:

@dataclass
class ToolPermission:
    tool: str
    allowed_patterns: list[str]  # Allowed argument patterns

@dataclass
class CommandPermissions:
    allowed_tools: list[ToolPermission]
    denied_tools: list[str]

COMMAND_PERMISSIONS = {
    "code_review": CommandPermissions(
        allowed_tools=[
            ToolPermission("Read", ["*"]),
            ToolPermission("Grep", ["*"]),
            ToolPermission("Bash", ["git diff*", "git log*", "git show*"]),
        ],
        denied_tools=["Write", "Edit", "Bash:rm*", "Bash:git push*"]
    ),
    "refactor": CommandPermissions(
        allowed_tools=[
            ToolPermission("Read", ["*"]),
            ToolPermission("Write", ["*.py", "*.ts", "*.js"]),
            ToolPermission("Edit", ["*.py", "*.ts", "*.js"]),
            ToolPermission("Bash", ["git diff*", "pytest*", "npm test*"]),
        ],
        denied_tools=["Bash:rm -rf*", "Bash:git push*"]
    ),
}

class PermissionEnforcer:
    def check(self, command: str, tool: str, args: dict) -> bool:
        perms = COMMAND_PERMISSIONS.get(command)
        if not perms:
            return True  # No restrictions

        # Check denied first
        for denied in perms.denied_tools:
            if self._matches(tool, args, denied):
                return False

        # Check allowed
        for allowed in perms.allowed_tools:
            if allowed.tool == tool:
                if self._args_match(args, allowed.allowed_patterns):
                    return True

        return False  # Not in allowed list

Effort: Low

Verdict: YES - Security best practice.


Ideas NOT Worth Adopting

Markdown-Native Definitions

Why skip: BLACKICE uses Python/YAML which is more powerful.

Claude-Specific Hooks

Why skip: BLACKICE should remain model-agnostic.


Summary

Feature Worth Adopting? Effort Priority
Proactive Agent Spawning YES Medium High
Multi-Step Command Chains YES Low High
Cascading Verification YES Medium Medium
Tool Permission Scoping YES Low Medium

References


<!-- Source Gist 12 of 19: 8c529b7bfea515a8a09db9ed5de4327c -->

Acontext Ideas for BLACKICE

Acontext Ideas for BLACKICE

Ideas from Acontext for BLACKICE.

What is Acontext?

A context data platform for storing, observing, and optimizing AI agent performance with unified storage and self-learning capabilities.

Aspect Acontext BLACKICE
Focus Context storage & learning Iterate-until-success
Storage PostgreSQL + Redis + S3 SQLite (Beads)
Learning Experience agent + SOPs Reflexion
API FastAPI CLI

Key Features

  1. Unified Message Storage - Multi-provider LLM message persistence
  2. Background Task Extraction - Automatic TODO detection from conversations
  3. Experience Agent - Learns from successful completions
  4. SOP Generation - Creates reusable procedures from patterns
  5. Artifact Storage - S3-backed file management

Ideas Worth Adopting

1. Background Task Extraction

What it is: Automatically extract TODOs and action items from agent conversations.

Current BLACKICE approach: Manual task tracking.

Why adopt: Don't lose tasks mentioned in conversation. Automatic backlog population.

Implementation sketch:

import re

@dataclass
class ExtractedTask:
    description: str
    source: str  # Which message it came from
    priority: Literal["high", "medium", "low"]
    due: str | None

class TaskExtractor:
    """Extract tasks from agent conversations."""

    TODO_PATTERNS = [
        r"TODO:\s*(.+)",
        r"FIXME:\s*(.+)",
        r"(?:need to|should|must|have to)\s+(.+?)(?:\.|$)",
        r"(?:later|next|afterwards?),?\s+(.+?)(?:\.|$)",
        r"don't forget to\s+(.+?)(?:\.|$)",
    ]

    PRIORITY_KEYWORDS = {
        "high": ["urgent", "critical", "asap", "immediately", "blocking"],
        "medium": ["soon", "important", "should"],
        "low": ["eventually", "nice to have", "when possible"],
    }

    async def extract_from_conversation(self, messages: list[Message]) -> list[ExtractedTask]:
        """Extract all tasks from conversation history."""
        tasks = []

        for msg in messages:
            content = msg.content.lower()

            for pattern in self.TODO_PATTERNS:
                matches = re.findall(pattern, content, re.IGNORECASE)
                for match in matches:
                    tasks.append(ExtractedTask(
                        description=match.strip(),
                        source=msg.id,
                        priority=self._detect_priority(match),
                        due=self._detect_due_date(match)
                    ))

        # Deduplicate similar tasks
        return self._deduplicate(tasks)

    def _detect_priority(self, text: str) -> str:
        text_lower = text.lower()
        for priority, keywords in self.PRIORITY_KEYWORDS.items():
            if any(kw in text_lower for kw in keywords):
                return priority
        return "medium"

    async def monitor_and_extract(self, beads: BeadsClient):
        """Background task that monitors for new tasks."""
        async for event in beads.subscribe("message_added"):
            tasks = await self.extract_from_conversation([event.message])
            for task in tasks:
                await beads.append_event("task_extracted", task.__dict__)

Effort: Medium

Verdict: YES - Automatic task discovery is valuable.


2. SOP Generation from Success Patterns

What it is: When agent successfully completes a task type multiple times, generate reusable SOP.

Current BLACKICE approach: Reflexion learns but doesn't formalize.

Why adopt: Turn implicit learning into explicit, shareable procedures.

Implementation sketch:

@dataclass
class SOP:
    id: str
    task_type: str
    title: str
    steps: list[str]
    prerequisites: list[str]
    success_criteria: list[str]
    source_tasks: list[str]  # Tasks that contributed to this SOP
    confidence: float

class SOPGenerator:
    """Generate SOPs from successful task patterns."""

    def __init__(self, beads: BeadsClient, llm: LLMAdapter):
        self.beads = beads
        self.llm = llm

    async def find_candidates(self, min_successes: int = 3) -> list[str]:
        """Find task types with enough successes to generate SOP."""
        query = """
            SELECT task_type, COUNT(*) as success_count
            FROM tasks
            WHERE status = 'success'
            GROUP BY task_type
            HAVING COUNT(*) >= ?
        """
        return await self.beads.query(query, (min_successes,))

    async def generate_sop(self, task_type: str) -> SOP:
        """Generate SOP from successful task executions."""
        # Get successful task traces
        traces = await self.beads.get_traces(
            task_type=task_type,
            status="success",
            limit=10
        )

        # Extract common patterns using LLM
        prompt = f"""
Analyze these successful task executions and extract a reusable Standard Operating Procedure.

Task type: {task_type}

Successful executions:
{json.dumps([t.summary for t in traces], indent=2)}

Generate an SOP with:
1. Prerequisites (what must be true before starting)
2. Steps (ordered actions to take)
3. Success criteria (how to know it's done)

Format as JSON matching this schema:
{{
    "title": "string",
    "prerequisites": ["string"],
    "steps": ["string"],
    "success_criteria": ["string"]
}}
"""
        response = await self.llm.generate(prompt)
        sop_data = json.loads(response)

        return SOP(
            id=f"sop-{task_type}-{uuid4().hex[:8]}",
            task_type=task_type,
            title=sop_data["title"],
            steps=sop_data["steps"],
            prerequisites=sop_data["prerequisites"],
            success_criteria=sop_data["success_criteria"],
            source_tasks=[t.id for t in traces],
            confidence=len(traces) / 10  # More sources = higher confidence
        )

    async def apply_sop(self, task: Task) -> str:
        """Inject relevant SOP into task prompt."""
        sop = await self.beads.get_sop(task.task_type)
        if not sop:
            return task.description

        return f"""
## Standard Operating Procedure: {sop.title}

### Prerequisites
{chr(10).join(f"- {p}" for p in sop.prerequisites)}

### Recommended Steps
{chr(10).join(f"{i+1}. {s}" for i, s in enumerate(sop.steps))}

### Success Criteria
{chr(10).join(f"- {c}" for c in sop.success_criteria)}

---

## Your Task
{task.description}

Follow the SOP above unless the task requires deviation.
"""

Effort: Medium

Verdict: YES - Formalized learning is powerful.


3. Artifact-Centric Storage

What it is: Manage generated outputs through file paths, not inline content.

Current BLACKICE approach: Code in Beads events.

Why adopt: Large artifacts bloat context. File references are lightweight.

Implementation sketch:

@dataclass
class Artifact:
    id: str
    task_id: str
    type: Literal["code", "config", "docs", "test", "other"]
    path: Path
    size_bytes: int
    created_at: datetime
    metadata: dict

class ArtifactStore:
    """Store and retrieve task artifacts."""

    def __init__(self, base_path: Path, s3_client=None):
        self.base_path = base_path
        self.s3 = s3_client  # Optional cloud backup

    async def save(self, task_id: str, content: str, artifact_type: str, filename: str) -> Artifact:
        """Save artifact and return reference."""
        artifact_dir = self.base_path / task_id
        artifact_dir.mkdir(parents=True, exist_ok=True)

        path = artifact_dir / filename
        path.write_text(content)

        artifact = Artifact(
            id=str(uuid4()),
            task_id=task_id,
            type=artifact_type,
            path=path,
            size_bytes=len(content.encode()),
            created_at=datetime.now(),
            metadata={"original_filename": filename}
        )

        # Optional: backup to S3
        if self.s3:
            await self.s3.upload(str(path), f"artifacts/{task_id}/{filename}")

        return artifact

    async def get_summary(self, task_id: str) -> str:
        """Get lightweight summary of artifacts (not full content)."""
        artifacts = await self.list(task_id)
        lines = ["## Generated Artifacts", ""]
        for a in artifacts:
            lines.append(f"- `{a.path.name}` ({a.type}, {a.size_bytes} bytes)")
        return "\n".join(lines)

    def get_reference_for_prompt(self, artifact: Artifact) -> str:
        """Get artifact reference for agent prompt (not full content)."""
        return f"[Artifact: {artifact.path.name}] - Use `read_file` tool to access"

Effort: Low

Verdict: YES - Keep context lean.


4. Session Health Monitoring

What it is: Background agent monitors session health without explicit config.

Current BLACKICE approach: Manual monitoring.

Why adopt: Automatic detection of stuck sessions, runaway costs, etc.

Implementation sketch:

@dataclass
class SessionHealth:
    session_id: str
    status: Literal["healthy", "warning", "critical"]
    issues: list[str]
    metrics: dict

class SessionMonitor:
    """Monitor session health in background."""

    HEALTH_CHECKS = [
        ("iteration_stuck", lambda s: s.current_iteration == s.last_iteration and s.idle_time > 60),
        ("cost_warning", lambda s: s.token_cost > s.budget * 0.8),
        ("cost_critical", lambda s: s.token_cost > s.budget),
        ("loop_detected", lambda s: s.repeated_outputs > 3),
        ("error_rate_high", lambda s: s.error_count / max(s.iteration_count, 1) > 0.5),
    ]

    async def check(self, session: Session) -> SessionHealth:
        issues = []
        for check_name, check_fn in self.HEALTH_CHECKS:
            try:
                if check_fn(session):
                    issues.append(check_name)
            except Exception:
                pass

        status = "healthy"
        if any("critical" in i for i in issues):
            status = "critical"
        elif issues:
            status = "warning"

        return SessionHealth(
            session_id=session.id,
            status=status,
            issues=issues,
            metrics={
                "iterations": session.iteration_count,
                "tokens": session.token_count,
                "errors": session.error_count,
                "idle_seconds": session.idle_time,
            }
        )

    async def monitor_loop(self, beads: BeadsClient):
        """Background monitoring loop."""
        while True:
            active_sessions = await beads.get_active_sessions()
            for session in active_sessions:
                health = await self.check(session)
                if health.status != "healthy":
                    await self._alert(health)
            await asyncio.sleep(10)

Effort: Medium

Verdict: YES - Proactive health monitoring.


Ideas NOT Worth Adopting

Full PostgreSQL + Redis Stack

Why skip: BLACKICE's SQLite (Beads) is simpler and sufficient.

Multi-Provider Message Normalization

Why skip: BLACKICE already has adapter pattern.


Summary

Feature Worth Adopting? Effort Priority
Background Task Extraction YES Medium High
SOP Generation YES Medium Medium
Artifact-Centric Storage YES Low Medium
Session Health Monitoring YES Medium Medium

References


<!-- Source Gist 13 of 19: 752c3748a1282907105c8e2e233393d2 -->

Planning-with-Files Ideas for BLACKICE

Planning with Files Ideas for BLACKICE

Ideas from Planning with Files for BLACKICE.

What is Planning with Files?

A Claude Code skill implementing persistent markdown-based planning. Uses filesystem as memory to prevent goal drift.

Aspect Planning with Files BLACKICE
Focus Persistent task state Iterate-until-success
Memory Markdown files Beads event store
Pattern 3-file system Event replay
Inspiration Manus agent Ralph Loop

Key Features

  1. Filesystem as Memory - Files persist state, not context window
  2. 3-File Pattern - task_plan.md, notes.md, deliverable
  3. Attention Recovery - Re-read plan before decisions
  4. Append-Only Notes - Never modify historical entries
  5. Goal Tracking - Checkbox-based progress visibility

Ideas Worth Adopting

1. Forced Attention Recovery

What it is: Re-read objectives before every major decision.

Current BLACKICE approach: Hope agent remembers goals.

Why adopt: Prevents drift over long sessions. Manus uses this for 50+ tool calls.

Implementation sketch:

class AttentionManager:
    """Force agent to re-read objectives periodically."""

    def __init__(self, task: Task, interval: int = 5):
        self.task = task
        self.interval = interval  # Re-read every N tool calls
        self.tool_call_count = 0
        self.plan_path = Path(f".agent/{task.id}/task_plan.md")

    def before_tool_call(self, tool: str, args: dict) -> str | None:
        """Check if we need attention recovery."""
        self.tool_call_count += 1

        if self.tool_call_count % self.interval == 0:
            return self._get_attention_prompt()
        return None

    def _get_attention_prompt(self) -> str:
        plan = self.plan_path.read_text() if self.plan_path.exists() else ""
        return f"""
⚠️ ATTENTION CHECK (call #{self.tool_call_count})

Before proceeding, re-read your objectives:

{plan}

Current phase: {self._get_current_phase(plan)}
Remaining tasks: {self._count_remaining(plan)}

Continue with your next action, keeping these objectives in mind.
"""

    def _get_current_phase(self, plan: str) -> str:
        # Find first unchecked phase
        for line in plan.split("\n"):
            if line.startswith("- [ ]"):
                return line.replace("- [ ]", "").strip()
        return "All phases complete"

    def _count_remaining(self, plan: str) -> int:
        return plan.count("- [ ]")

# Usage in execution loop
attention = AttentionManager(task)

for tool_call in agent.tool_calls:
    attention_prompt = attention.before_tool_call(tool_call.tool, tool_call.args)
    if attention_prompt:
        await agent.inject_context(attention_prompt)

    await execute_tool(tool_call)

Effort: Low

Verdict: YES - Simple, effective drift prevention.


2. 3-File State Pattern

What it is: Separate concerns into plan, notes, and output files.

Current BLACKICE approach: Everything in Beads events.

Why adopt: Human-readable state. Easy debugging. Agent can re-read naturally.

Implementation sketch:

@dataclass
class TaskWorkspace:
    """3-file workspace for task state."""

    task_id: str
    base_path: Path = Path(".agent")

    @property
    def plan_path(self) -> Path:
        return self.base_path / self.task_id / "task_plan.md"

    @property
    def notes_path(self) -> Path:
        return self.base_path / self.task_id / "notes.md"

    @property
    def output_path(self) -> Path:
        return self.base_path / self.task_id / "output.md"

    def init(self, task: Task):
        """Initialize workspace with plan template."""
        self.plan_path.parent.mkdir(parents=True, exist_ok=True)

        plan_template = f"""# Task Plan: {task.name}

## Objective
{task.description}

## Phases
- [ ] Phase 1: Research and understand requirements
- [ ] Phase 2: Design solution approach
- [ ] Phase 3: Implement solution
- [ ] Phase 4: Test and validate
- [ ] Phase 5: Document and deliver

## Success Criteria
{task.success_criteria or "Task completed successfully"}

## Progress Log
<!-- Updated by agent after each phase -->
"""
        self.plan_path.write_text(plan_template)
        self.notes_path.write_text("# Research Notes\n\n")

    def append_note(self, note: str):
        """Append to notes (never modify existing)."""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
        entry = f"\n## {timestamp}\n{note}\n"
        with self.notes_path.open("a") as f:
            f.write(entry)

    def update_progress(self, phase: int, status: str):
        """Update phase checkbox in plan."""
        plan = self.plan_path.read_text()
        # Replace checkbox for phase
        old = f"- [ ] Phase {phase}:"
        new = f"- [x] Phase {phase}: ✅ {status}"
        plan = plan.replace(old, new)
        self.plan_path.write_text(plan)

    def get_context_for_agent(self) -> str:
        """Get full context for agent."""
        plan = self.plan_path.read_text() if self.plan_path.exists() else ""
        notes = self.notes_path.read_text() if self.notes_path.exists() else ""

        return f"""
## Current Task Plan
{plan}

## Research Notes (read for context)
{notes}

---
Continue from where you left off. Update the plan as you make progress.
"""

Effort: Low

Verdict: YES - Simple, debuggable state management.


3. Append-Only Notes

What it is: Only append to notes, never modify history.

Current BLACKICE approach: Event store is append-only.

Why adopt: Audit trail. No lost information. Easy to follow timeline.

Implementation sketch:

class AppendOnlyLog:
    """Append-only log with structured entries."""

    def __init__(self, path: Path):
        self.path = path

    def append(self, entry_type: str, content: str, metadata: dict = None):
        """Append entry with timestamp and type."""
        timestamp = datetime.now().isoformat()
        entry = {
            "timestamp": timestamp,
            "type": entry_type,
            "content": content,
            "metadata": metadata or {}
        }

        with self.path.open("a") as f:
            f.write(f"\n---\n")
            f.write(f"**[{timestamp}]** `{entry_type}`\n\n")
            f.write(content)
            f.write("\n")

    def find_entries(self, entry_type: str) -> list[str]:
        """Find all entries of a type."""
        text = self.path.read_text()
        entries = []
        for section in text.split("\n---\n"):
            if f"`{entry_type}`" in section:
                entries.append(section)
        return entries

# Usage
log = AppendOnlyLog(Path(".agent/task-123/notes.md"))

log.append("discovery", "Found that the API requires auth token in header")
log.append("decision", "Will use OAuth2 client credentials flow")
log.append("blocker", "API rate limit hit, waiting 60 seconds")
log.append("resolution", "Implemented retry with exponential backoff")

Effort: Low

Verdict: YES - Already have this pattern in Beads.


4. Conditional Activation

What it is: Only use structured planning for complex tasks (3+ steps).

Current BLACKICE approach: Same process for all tasks.

Why adopt: Don't over-engineer simple tasks. Save overhead for complex ones.

Implementation sketch:

class TaskComplexityDetector:
    """Detect if task needs structured planning."""

    COMPLEXITY_INDICATORS = [
        r"multiple\s+files?",
        r"several\s+steps?",
        r"refactor",
        r"migrate",
        r"integrate",
        r"implement.*feature",
        r"debug.*complex",
        r"across.*modules?",
    ]

    SIMPLE_INDICATORS = [
        r"fix\s+typo",
        r"update\s+version",
        r"add\s+comment",
        r"rename",
        r"simple\s+change",
    ]

    def needs_structured_planning(self, task: Task) -> bool:
        """Check if task needs full planning infrastructure."""
        description = task.description.lower()

        # Check for simple task indicators
        for pattern in self.SIMPLE_INDICATORS:
            if re.search(pattern, description):
                return False

        # Check for complexity indicators
        complexity_score = 0
        for pattern in self.COMPLEXITY_INDICATORS:
            if re.search(pattern, description):
                complexity_score += 1

        return complexity_score >= 2

# Usage in flywheel
detector = TaskComplexityDetector()

if detector.needs_structured_planning(task):
    workspace = TaskWorkspace(task.id)
    workspace.init(task)
    await run_with_planning(task, workspace)
else:
    await run_simple(task)

Effort: Low

Verdict: YES - Don't over-engineer simple tasks.


Ideas NOT Worth Adopting

Manual File Editing

Why skip: BLACKICE should manage state automatically.

Manus-Specific Patterns

Why skip: Some patterns are specific to Manus's architecture.


Summary

Feature Worth Adopting? Effort Priority
Forced Attention Recovery YES Low High
3-File State Pattern YES Low Medium
Append-Only Notes YES Low Medium
Conditional Activation YES Low Low

References


<!-- Source Gist 14 of 19: 4de321819ca80dc51ca0d5f6ce0926db -->

Petit Ideas for BLACKICE

Petit Ideas for BLACKICE

Ideas from Petit for BLACKICE.

What is Petit?

A lightweight Rust task scheduler with DAG execution, designed for embedded/minimal environments.

Aspect Petit BLACKICE
Focus Task scheduling with dependencies Iterate-until-success
Language Rust Python
Execution DAG topological sort Sequential + parallel
State SQLite or in-memory Beads event store

Key Features

  1. DAG Dependency Resolution - Topological sort for execution order
  2. Conditional Execution - all_success, on_failure, all_done
  3. Cron Scheduling - 6-field timezone-aware expressions
  4. Concurrency Limits - Max tasks/jobs to prevent exhaustion
  5. Pluggable Storage - SQLite or in-memory backends

Ideas Worth Adopting

1. Conditional Execution Semantics

What it is: Tasks specify when they should run based on dependency status.

Current BLACKICE approach: Tasks run when dependencies complete (success only).

Why adopt: Handle failure paths gracefully. Run cleanup on failure.

Implementation sketch:

from enum import Enum

class ExecutionCondition(Enum):
    ALL_SUCCESS = "all_success"   # Run only if all deps succeeded
    ALL_DONE = "all_done"         # Run when all deps done (success or fail)
    ALL_FAILED = "all_failed"     # Run only if all deps failed
    ANY_SUCCESS = "any_success"   # Run if any dep succeeded
    ANY_FAILED = "any_failed"     # Run if any dep failed
    ALWAYS = "always"             # Always run regardless

@dataclass
class TaskNode:
    id: str
    name: str
    depends_on: list[str]
    condition: ExecutionCondition = ExecutionCondition.ALL_SUCCESS

    def should_run(self, dep_results: dict[str, TaskResult]) -> bool:
        if not self.depends_on:
            return True

        dep_statuses = [dep_results[d].status for d in self.depends_on]

        match self.condition:
            case ExecutionCondition.ALL_SUCCESS:
                return all(s == "success" for s in dep_statuses)
            case ExecutionCondition.ALL_DONE:
                return all(s in ("success", "failed") for s in dep_statuses)
            case ExecutionCondition.ALL_FAILED:
                return all(s == "failed" for s in dep_statuses)
            case ExecutionCondition.ANY_SUCCESS:
                return any(s == "success" for s in dep_statuses)
            case ExecutionCondition.ANY_FAILED:
                return any(s == "failed" for s in dep_statuses)
            case ExecutionCondition.ALWAYS:
                return True

# Example: Cleanup task runs on failure
cleanup_task = TaskNode(
    id="cleanup",
    name="Cleanup on failure",
    depends_on=["deploy"],
    condition=ExecutionCondition.ANY_FAILED
)

# Example: Notification runs always
notify_task = TaskNode(
    id="notify",
    name="Send completion notification",
    depends_on=["deploy", "cleanup"],
    condition=ExecutionCondition.ALL_DONE
)

Effort: Low

Verdict: YES - Essential for robust workflows.


2. Concurrency Limits

What it is: Max simultaneous tasks to prevent resource exhaustion.

Current BLACKICE approach: No explicit limits.

Why adopt: Don't overwhelm GPU, API rate limits, or memory.

Implementation sketch:

import asyncio
from dataclasses import dataclass

@dataclass
class ConcurrencyConfig:
    max_total_tasks: int = 10      # Global limit
    max_tasks_per_job: int = 5     # Per-workflow limit
    max_tasks_per_model: dict[str, int] = None  # Per-model limits

    def __post_init__(self):
        if self.max_tasks_per_model is None:
            self.max_tasks_per_model = {
                "claude-opus": 2,      # Expensive, limit concurrency
                "claude-sonnet": 5,
                "ollama/qwen": 10,     # Local, can run more
            }

class ConcurrencyLimiter:
    def __init__(self, config: ConcurrencyConfig):
        self.config = config
        self._global_semaphore = asyncio.Semaphore(config.max_total_tasks)
        self._model_semaphores: dict[str, asyncio.Semaphore] = {}
        self._job_semaphores: dict[str, asyncio.Semaphore] = {}

    def _get_model_semaphore(self, model: str) -> asyncio.Semaphore:
        if model not in self._model_semaphores:
            limit = self.config.max_tasks_per_model.get(model, 5)
            self._model_semaphores[model] = asyncio.Semaphore(limit)
        return self._model_semaphores[model]

    def _get_job_semaphore(self, job_id: str) -> asyncio.Semaphore:
        if job_id not in self._job_semaphores:
            self._job_semaphores[job_id] = asyncio.Semaphore(self.config.max_tasks_per_job)
        return self._job_semaphores[job_id]

    async def acquire(self, task: Task):
        """Acquire all required semaphores."""
        await self._global_semaphore.acquire()
        await self._get_model_semaphore(task.model).acquire()
        await self._get_job_semaphore(task.job_id).acquire()

    def release(self, task: Task):
        """Release all semaphores."""
        self._get_job_semaphore(task.job_id).release()
        self._get_model_semaphore(task.model).release()
        self._global_semaphore.release()

    async def run_with_limits(self, task: Task, executor: Callable):
        """Execute task within concurrency limits."""
        await self.acquire(task)
        try:
            return await executor(task)
        finally:
            self.release(task)

Effort: Low

Verdict: YES - Essential for production.


3. Cross-Job Dependencies

What it is: Tasks in one workflow can depend on tasks in another.

Current BLACKICE approach: Dependencies only within a task.

Why adopt: Complex projects need cross-workflow coordination.

Implementation sketch:

@dataclass
class TaskRef:
    job_id: str
    task_id: str

    def __str__(self):
        return f"{self.job_id}:{self.task_id}"

@dataclass
class TaskNode:
    id: str
    job_id: str
    depends_on: list[TaskRef]  # Can reference other jobs

class CrossJobExecutor:
    """Execute tasks with cross-job dependencies."""

    def __init__(self):
        self.results: dict[str, TaskResult] = {}  # "job:task" -> result

    async def execute_task(self, task: TaskNode):
        # Wait for all dependencies (even from other jobs)
        for dep in task.depends_on:
            dep_key = str(dep)
            while dep_key not in self.results:
                await asyncio.sleep(0.1)

        # Check if should run based on dep results
        dep_results = {str(d): self.results[str(d)] for d in task.depends_on}
        if not task.should_run(dep_results):
            self.results[f"{task.job_id}:{task.id}"] = TaskResult(status="skipped")
            return

        # Execute
        result = await self._run(task)
        self.results[f"{task.job_id}:{task.id}"] = result

# Example: Deploy job depends on build job
build_task = TaskNode(
    id="compile",
    job_id="build",
    depends_on=[]
)

deploy_task = TaskNode(
    id="deploy",
    job_id="deploy",
    depends_on=[TaskRef("build", "compile")]  # Cross-job dependency
)

Effort: Medium

Verdict: YES - Useful for complex workflows.


4. Fixed-Delay vs Exponential Backoff Option

What it is: Choice between simple fixed-delay retries or exponential backoff.

Current BLACKICE approach: Exponential backoff only.

Why adopt: Some tasks benefit from fixed delay (e.g., waiting for external service).

Implementation sketch:

from enum import Enum

class RetryStrategy(Enum):
    FIXED_DELAY = "fixed"
    EXPONENTIAL = "exponential"
    LINEAR = "linear"

@dataclass
class RetryConfig:
    strategy: RetryStrategy
    max_retries: int
    base_delay: float  # seconds
    max_delay: float = 300  # cap for exponential

    def get_delay(self, attempt: int) -> float:
        match self.strategy:
            case RetryStrategy.FIXED_DELAY:
                return self.base_delay
            case RetryStrategy.EXPONENTIAL:
                delay = self.base_delay * (2 ** attempt)
                return min(delay, self.max_delay)
            case RetryStrategy.LINEAR:
                delay = self.base_delay * (attempt + 1)
                return min(delay, self.max_delay)

# Task-specific retry configs
RETRY_CONFIGS = {
    "api_call": RetryConfig(RetryStrategy.EXPONENTIAL, max_retries=5, base_delay=1),
    "file_wait": RetryConfig(RetryStrategy.FIXED_DELAY, max_retries=60, base_delay=1),
    "build": RetryConfig(RetryStrategy.LINEAR, max_retries=3, base_delay=10),
}

Effort: Low

Verdict: YES - Flexibility is good.


Ideas NOT Worth Adopting

Rust Core

Why skip: BLACKICE is Python. Don't fragment the stack.

Cron Scheduling

Why skip: BLACKICE is event-driven, not scheduled.

Embedded Focus

Why skip: BLACKICE targets GPUs, not embedded systems.


Summary

Feature Worth Adopting? Effort Priority
Conditional Execution YES Low High
Concurrency Limits YES Low High
Cross-Job Dependencies YES Medium Medium
Flexible Retry Strategies YES Low Low

References


<!-- Source Gist 15 of 19: 5f4cb9ddbde4f88559f4bfb2df27d99f -->

Plannotator Ideas for BLACKICE

Plannotator Ideas for BLACKICE

Ideas from Plannotator for BLACKICE.

What is Plannotator?

A visual plan review system where humans annotate AI plans (delete, insert, replace) before approval.

Aspect Plannotator BLACKICE
Focus Visual plan annotation Iterate-until-success
Interface Browser UI CLI
Feedback Structured annotations Success/failure
License BSL 1.1 (restrictive) MIT

Key Features

  1. Visual Plan Markup - Delete, insert, replace, comment operations
  2. Image Attachments - Drawing tools for UI mockups
  3. Structured Feedback Format - Machine-readable annotations
  4. Auto-Save - Export to Obsidian/Bear Notes
  5. Plugin Architecture - Works with Claude Code and OpenCode

Ideas Worth Adopting

1. Structured Feedback Format

What it is: Human annotations converted to machine-readable format.

Current BLACKICE approach: Unstructured user feedback.

Why adopt: Agents can parse and act on structured feedback precisely.

Implementation sketch:

from enum import Enum

class AnnotationType(Enum):
    DELETE = "delete"
    INSERT = "insert"
    REPLACE = "replace"
    COMMENT = "comment"
    APPROVE = "approve"
    REJECT = "reject"

@dataclass
class Annotation:
    type: AnnotationType
    target: str              # What's being annotated
    line_start: int | None
    line_end: int | None
    content: str | None      # New content for insert/replace
    comment: str | None      # Human explanation

@dataclass
class AnnotatedPlan:
    original_plan: str
    annotations: list[Annotation]
    overall_status: Literal["approved", "needs_changes", "rejected"]
    summary: str

    def apply_annotations(self) -> str:
        """Apply annotations to generate revised plan."""
        lines = self.original_plan.split("\n")

        # Sort by line number descending (apply from bottom up)
        sorted_annotations = sorted(
            [a for a in self.annotations if a.line_start],
            key=lambda a: a.line_start,
            reverse=True
        )

        for annotation in sorted_annotations:
            match annotation.type:
                case AnnotationType.DELETE:
                    del lines[annotation.line_start:annotation.line_end]
                case AnnotationType.INSERT:
                    lines.insert(annotation.line_start, annotation.content)
                case AnnotationType.REPLACE:
                    lines[annotation.line_start:annotation.line_end] = [annotation.content]

        return "\n".join(lines)

    def to_agent_prompt(self) -> str:
        """Convert annotations to prompt for agent."""
        if self.overall_status == "approved":
            return "Plan approved. Proceed with implementation."

        feedback = ["Human feedback on your plan:", ""]

        for a in self.annotations:
            match a.type:
                case AnnotationType.DELETE:
                    feedback.append(f"❌ DELETE lines {a.line_start}-{a.line_end}: {a.comment or 'Remove this'}")
                case AnnotationType.INSERT:
                    feedback.append(f"➕ INSERT at line {a.line_start}: {a.content}")
                case AnnotationType.REPLACE:
                    feedback.append(f"🔄 REPLACE lines {a.line_start}-{a.line_end} with: {a.content}")
                case AnnotationType.COMMENT:
                    feedback.append(f"💬 COMMENT on lines {a.line_start}-{a.line_end}: {a.comment}")

        feedback.append("")
        feedback.append("Please revise your plan based on this feedback.")

        return "\n".join(feedback)

Effort: Medium

Verdict: YES - Clear feedback format.


2. Async Human-in-the-Loop

What it is: Decouple plan generation from execution via browser review.

Current BLACKICE approach: Synchronous consensus voting.

Why adopt: Human review doesn't block agents. Review when convenient.

Implementation sketch:

import asyncio
from uuid import uuid4

@dataclass
class PendingReview:
    id: str
    plan: str
    submitted_at: datetime
    reviewed: bool = False
    annotations: AnnotatedPlan | None = None

class AsyncReviewQueue:
    """Queue plans for async human review."""

    def __init__(self):
        self.pending: dict[str, PendingReview] = {}
        self._review_events: dict[str, asyncio.Event] = {}

    async def submit_for_review(self, plan: str, timeout: float = 3600) -> AnnotatedPlan:
        """Submit plan and wait for human review."""
        review_id = str(uuid4())
        self.pending[review_id] = PendingReview(
            id=review_id,
            plan=plan,
            submitted_at=datetime.now()
        )
        self._review_events[review_id] = asyncio.Event()

        # Notify human (webhook, email, desktop notification)
        await self._notify_reviewer(review_id, plan)

        # Wait for review (with timeout)
        try:
            await asyncio.wait_for(
                self._review_events[review_id].wait(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            raise ReviewTimeoutError(f"Review {review_id} timed out")

        return self.pending[review_id].annotations

    async def complete_review(self, review_id: str, annotations: AnnotatedPlan):
        """Human completes review via API."""
        if review_id not in self.pending:
            raise ValueError(f"Unknown review: {review_id}")

        self.pending[review_id].annotations = annotations
        self.pending[review_id].reviewed = True
        self._review_events[review_id].set()

# Web API for human review
@app.post("/api/reviews/{review_id}")
async def submit_review(review_id: str, annotations: AnnotatedPlan):
    await review_queue.complete_review(review_id, annotations)
    return {"status": "received"}

Effort: Medium

Verdict: YES - Better UX for human review.


3. Visual Plan Editing

What it is: Web UI for plan manipulation with visual tools.

Current BLACKICE approach: CLI only.

Why adopt: Non-technical stakeholders can review AI plans visually.

Implementation sketch:

# Backend API for plan review UI
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles

app = FastAPI()

@app.get("/api/plans/{plan_id}")
async def get_plan(plan_id: str):
    """Get plan for review."""
    plan = await beads.get_plan(plan_id)
    return {
        "id": plan_id,
        "content": plan.content,
        "lines": plan.content.split("\n"),
        "metadata": plan.metadata
    }

@app.post("/api/plans/{plan_id}/annotations")
async def save_annotations(plan_id: str, annotations: list[Annotation]):
    """Save annotations from UI."""
    await beads.save_annotations(plan_id, annotations)
    return {"status": "saved"}

# Mount React/Vue UI
app.mount("/", StaticFiles(directory="ui/dist", html=True))

Effort: High (requires frontend)

Verdict: MAYBE - Nice but CLI may be sufficient.


Ideas NOT Worth Adopting

BSL License

Why skip: Business Source License restricts commercial use. BLACKICE is MIT.

Obsidian/Bear Integration

Why skip: Too niche. General file export is sufficient.


Summary

Feature Worth Adopting? Effort Priority
Structured Feedback Format YES Medium High
Async Human-in-the-Loop YES Medium Medium
Visual Plan Editing MAYBE High Low

References


<!-- Source Gist 16 of 19: 5d430f8cf367b9f1e02b660d7edae31f -->

Wayfound MCP Supervisor Ideas for BLACKICE

Wayfound MCP Supervisor Ideas for BLACKICE

Ideas from Wayfound MCP Supervisor for BLACKICE.

What is Wayfound?

AI supervision for agentic systems via Model Context Protocol. Agents query guidelines, receive feedback, and iterate until quality thresholds are met.

Aspect Wayfound BLACKICE
Focus Quality supervision & grading Iterate-until-success
Integration MCP (SSE) Direct API calls
Feedback Letter grades (A-F) Success/failure
Learning Historical session analysis Beads + Reflexion

Key Features

  1. Pre-Execution Guidance - Query guidelines before starting work
  2. Pitfall Identification - Learn from common issues in past sessions
  3. Graded Evaluation - Letter grades with detailed breakdown
  4. Iterative Refinement - Loop until grade ≥ A-
  5. Session Transparency - Full breakdown of what passed/failed

Ideas Worth Adopting

1. Pre-Execution Guidelines Query

What it is: Before starting work, agent queries for relevant guidelines.

Current BLACKICE approach: Guidelines baked into prompts.

Why adopt: Dynamic guidelines that evolve. Don't update prompts for every rule change.

Implementation sketch:

@dataclass
class Guideline:
    id: str
    category: str
    rule: str
    severity: Literal["must", "should", "may"]
    examples: list[str]

class GuidelinesStore:
    """Store and retrieve coding guidelines."""

    def __init__(self, db_path: Path):
        self.db = sqlite3.connect(db_path)

    def get_for_task(self, task_type: str, language: str) -> list[Guideline]:
        """Get relevant guidelines for task."""
        query = """
            SELECT * FROM guidelines
            WHERE task_type = ? OR task_type = 'all'
            AND (language = ? OR language = 'all')
            ORDER BY severity DESC
        """
        rows = self.db.execute(query, (task_type, language)).fetchall()
        return [Guideline(**row) for row in rows]

    def format_for_prompt(self, guidelines: list[Guideline]) -> str:
        """Format guidelines for agent prompt."""
        sections = {"must": [], "should": [], "may": []}
        for g in guidelines:
            sections[g.severity].append(f"- {g.rule}")

        return f"""
## Coding Guidelines

### MUST (Required)
{chr(10).join(sections['must'])}

### SHOULD (Recommended)
{chr(10).join(sections['should'])}

### MAY (Optional)
{chr(10).join(sections['may'])}
"""

# Usage before task execution
guidelines = store.get_for_task("code_review", "python")
guidelines_prompt = store.format_for_prompt(guidelines)
full_prompt = f"{guidelines_prompt}\n\n{task.description}"

Effort: Low-Medium

Verdict: YES - Dynamic, maintainable guidelines.


2. Letter Grade Evaluation

What it is: Grade agent outputs A-F with clear thresholds.

Current BLACKICE approach: Binary success/failure.

Why adopt: Nuanced feedback. "C" is different from "F". Enables quality thresholds.

Implementation sketch:

from enum import Enum

class Grade(Enum):
    A = 4.0
    A_MINUS = 3.7
    B_PLUS = 3.3
    B = 3.0
    B_MINUS = 2.7
    C_PLUS = 2.3
    C = 2.0
    C_MINUS = 1.7
    D = 1.0
    F = 0.0

@dataclass
class Evaluation:
    grade: Grade
    breakdown: dict[str, float]  # criterion -> score
    feedback: str
    passed: bool

    @classmethod
    def from_scores(cls, scores: dict[str, float], threshold: Grade = Grade.B) -> "Evaluation":
        avg = sum(scores.values()) / len(scores)
        grade = cls._score_to_grade(avg)
        return cls(
            grade=grade,
            breakdown=scores,
            feedback=cls._generate_feedback(scores),
            passed=grade.value >= threshold.value
        )

    @staticmethod
    def _score_to_grade(score: float) -> Grade:
        if score >= 0.95: return Grade.A
        if score >= 0.90: return Grade.A_MINUS
        if score >= 0.85: return Grade.B_PLUS
        if score >= 0.80: return Grade.B
        if score >= 0.75: return Grade.B_MINUS
        if score >= 0.70: return Grade.C_PLUS
        if score >= 0.65: return Grade.C
        if score >= 0.60: return Grade.C_MINUS
        if score >= 0.50: return Grade.D
        return Grade.F

class OutputEvaluator:
    """Evaluate agent outputs with letter grades."""

    CRITERIA = [
        "correctness",      # Does it work?
        "completeness",     # Is it done?
        "code_quality",     # Is it clean?
        "test_coverage",    # Is it tested?
        "documentation",    # Is it documented?
    ]

    async def evaluate(self, output: AgentOutput) -> Evaluation:
        scores = {}
        for criterion in self.CRITERIA:
            scores[criterion] = await self._score_criterion(output, criterion)
        return Evaluation.from_scores(scores)

    async def _score_criterion(self, output: AgentOutput, criterion: str) -> float:
        # Use another LLM to evaluate
        prompt = f"Score this {criterion} from 0 to 1:\n{output.code}"
        score_str = await self.evaluator_llm.run(prompt)
        return float(score_str)

Effort: Medium

Verdict: YES - Better than binary pass/fail.


3. Iterate Until Quality Threshold

What it is: Keep improving until output meets grade threshold.

Current BLACKICE approach: Iterate until success or max iterations.

Why adopt: "Success" is vague. Grade threshold is measurable.

Implementation sketch:

class QualityGatedLoop:
    """Iterate until quality threshold met."""

    def __init__(
        self,
        evaluator: OutputEvaluator,
        threshold: Grade = Grade.A_MINUS,
        max_iterations: int = 5
    ):
        self.evaluator = evaluator
        self.threshold = threshold
        self.max_iterations = max_iterations

    async def run(self, agent: Agent, task: Task) -> tuple[AgentOutput, Evaluation]:
        best_output = None
        best_eval = None

        for iteration in range(self.max_iterations):
            # Generate output
            output = await agent.run(task)

            # Evaluate
            evaluation = await self.evaluator.evaluate(output)

            # Track best
            if best_eval is None or evaluation.grade.value > best_eval.grade.value:
                best_output = output
                best_eval = evaluation

            # Check threshold
            if evaluation.passed:
                return output, evaluation

            # Generate improvement prompt
            improvement_prompt = f"""
Your previous output received grade: {evaluation.grade.name}

Breakdown:
{json.dumps(evaluation.breakdown, indent=2)}

Feedback: {evaluation.feedback}

Please improve your output to achieve at least {self.threshold.name}.
Focus on the lowest-scoring criteria.
"""
            task = Task(
                description=f"{task.description}\n\n{improvement_prompt}",
                id=task.id
            )

        # Return best even if threshold not met
        return best_output, best_eval

Effort: Medium

Verdict: YES - Quality-driven iteration is better.


4. Common Pitfall Analysis

What it is: Analyze past sessions to identify frequent issues.

Current BLACKICE approach: Reflexion learns from failures.

Why adopt: Proactive prevention. Show pitfalls BEFORE agent makes them.

Implementation sketch:

@dataclass
class Pitfall:
    issue: str
    frequency: float  # Percentage of sessions with this issue
    prevention: str   # How to avoid it
    example: str | None

class PitfallAnalyzer:
    """Analyze past sessions for common issues."""

    def __init__(self, beads: BeadsClient):
        self.beads = beads

    async def analyze_history(self, task_type: str, limit: int = 100) -> list[Pitfall]:
        """Find common pitfalls from past sessions."""
        # Get past sessions with failures
        sessions = await self.beads.query(
            event_type="task_failed",
            task_type=task_type,
            limit=limit
        )

        # Count issue types
        issue_counts = Counter()
        for session in sessions:
            issues = self._extract_issues(session)
            issue_counts.update(issues)

        # Convert to pitfalls
        total = len(sessions)
        pitfalls = []
        for issue, count in issue_counts.most_common(10):
            pitfalls.append(Pitfall(
                issue=issue,
                frequency=count / total,
                prevention=self._get_prevention(issue),
                example=self._get_example(issue, sessions)
            ))

        return pitfalls

    def format_for_prompt(self, pitfalls: list[Pitfall]) -> str:
        """Format pitfalls as warning for agent."""
        if not pitfalls:
            return ""

        lines = ["## Common Pitfalls to Avoid", ""]
        for p in pitfalls:
            lines.append(f"- **{p.issue}** ({p.frequency:.0%} of past attempts)")
            lines.append(f"  Prevention: {p.prevention}")

        return "\n".join(lines)

Effort: Medium

Verdict: YES - Learn from history proactively.


Ideas NOT Worth Adopting

External SaaS Dependency

Why skip: BLACKICE should work offline. Don't require external service.

SSE-Based Integration

Why skip: Simpler to use direct function calls than SSE streaming.


Summary

Feature Worth Adopting? Effort Priority
Pre-Execution Guidelines YES Low High
Letter Grade Evaluation YES Medium High
Quality-Gated Iteration YES Medium Medium
Common Pitfall Analysis YES Medium Medium

References


<!-- Source Gist 17 of 19: 4d1f6eee5b6f72d8b3f5f89c50a1eece -->

Ralph Orchestrator Ideas for BLACKICE

Ralph Orchestrator Ideas for BLACKICE

Ideas from Ralph Orchestrator for BLACKICE.

What is Ralph Orchestrator?

An autonomous AI agent loop that runs agents against a prompt file until task completion or limits are reached.

Aspect Ralph Orchestrator BLACKICE
Focus Iterate until complete Iterate until success + consensus
Language TypeScript/Python Python
State Git checkpoints + .agent/ workspace Beads event store
Agents Claude, Q Chat, Gemini, ACP Claude, Ollama, Letta
Tests 920+ tests 18K lines tests

Key Features

  1. Completion Marker Detection - Check for task completion, not assume success
  2. Git-Based Checkpointing - Async state preservation for recovery
  3. Adaptive Permission Framework - Graduated tool access control
  4. Agent Context Persistence - Scratchpad files maintain state across iterations
  5. Security Masking - Auto-mask API keys in logs

Ideas Worth Adopting

1. Completion Marker Detection

What it is: Explicitly check if agent marked task as complete, don't assume.

Current BLACKICE approach: Check for success/failure, but not explicit completion markers.

Why adopt: Agents should self-report completion status. Clearer than inferring from output.

Implementation sketch:

@dataclass
class CompletionMarker:
    status: Literal["complete", "blocked", "in_progress", "failed"]
    reason: str | None
    next_steps: list[str] | None

class CompletionDetector:
    """Detect task completion from agent output."""

    COMPLETION_PATTERNS = {
        "complete": [
            r"✅\s*TASK\s*COMPLETE",
            r"\[DONE\]",
            r"Task completed successfully",
        ],
        "blocked": [
            r"❌\s*BLOCKED",
            r"\[BLOCKED\]",
            r"Cannot proceed.*need",
        ],
        "in_progress": [
            r"🔄\s*IN\s*PROGRESS",
            r"\[WIP\]",
            r"Continuing with",
        ],
    }

    def detect(self, output: str) -> CompletionMarker:
        for status, patterns in self.COMPLETION_PATTERNS.items():
            for pattern in patterns:
                if re.search(pattern, output, re.IGNORECASE):
                    return CompletionMarker(status=status, reason=output[-500:], next_steps=None)

        # Default to in_progress if no marker found
        return CompletionMarker(status="in_progress", reason=None, next_steps=None)

    def require_completion(self, agent: Agent, task: Task) -> str:
        """Force agent to include completion marker."""
        prompt = f"""
{task.description}

IMPORTANT: You MUST end your response with one of these markers:
- ✅ TASK COMPLETE - if the task is fully done
- ❌ BLOCKED: <reason> - if you cannot proceed
- 🔄 IN PROGRESS: <next step> - if more work is needed

Do not end without a marker.
"""
        return prompt

Effort: Low

Verdict: YES - Clear completion semantics.


2. Adaptive Permission Framework

What it is: Graduated control over what tools agents can use.

Current BLACKICE approach: All-or-nothing tool access.

Why adopt: Different tasks need different permissions. Don't give file deletion to a documentation agent.

Implementation sketch:

from enum import Enum

class PermissionMode(Enum):
    AUTO_APPROVE = "auto_approve"     # Trust agent completely
    ALLOWLIST = "allowlist"           # Only specific tools
    DENYLIST = "denylist"             # Block specific tools
    INTERACTIVE = "interactive"       # Ask human each time
    DENY_ALL = "deny_all"             # Read-only mode

@dataclass
class PermissionPolicy:
    mode: PermissionMode
    allowed_tools: list[str] = field(default_factory=list)
    denied_tools: list[str] = field(default_factory=list)

# Per-role permission policies
ROLE_PERMISSIONS = {
    "explorer": PermissionPolicy(
        mode=PermissionMode.ALLOWLIST,
        allowed_tools=["read_file", "grep", "list_directory", "web_search"]
    ),
    "implementer": PermissionPolicy(
        mode=PermissionMode.DENYLIST,
        denied_tools=["rm", "delete", "drop_database", "format"]
    ),
    "reviewer": PermissionPolicy(
        mode=PermissionMode.ALLOWLIST,
        allowed_tools=["read_file", "grep", "run_tests"]
    ),
    "deployer": PermissionPolicy(
        mode=PermissionMode.INTERACTIVE,  # Human approves each action
    ),
}

class PermissionGuard:
    def __init__(self, policy: PermissionPolicy):
        self.policy = policy

    async def check(self, tool: str, args: dict) -> bool:
        match self.policy.mode:
            case PermissionMode.AUTO_APPROVE:
                return True
            case PermissionMode.DENY_ALL:
                return False
            case PermissionMode.ALLOWLIST:
                return tool in self.policy.allowed_tools
            case PermissionMode.DENYLIST:
                return tool not in self.policy.denied_tools
            case PermissionMode.INTERACTIVE:
                return await self.ask_human(tool, args)

Effort: Medium

Verdict: YES - Essential for security.


3. Scratchpad Persistence

What it is: Agents maintain notes across iterations in a scratchpad file.

Current BLACKICE approach: Context from Beads events.

Why adopt: Scratchpad is simpler for agent to read/write. Less overhead than event replay.

Implementation sketch:

class AgentScratchpad:
    """Persistent scratchpad for agent notes."""

    def __init__(self, task_id: str):
        self.path = Path(f".agent/{task_id}/scratchpad.md")
        self.path.parent.mkdir(parents=True, exist_ok=True)

    def read(self) -> str:
        if self.path.exists():
            return self.path.read_text()
        return ""

    def append(self, note: str):
        """Append note with timestamp."""
        timestamp = datetime.now().isoformat()
        entry = f"\n## {timestamp}\n{note}\n"
        with self.path.open("a") as f:
            f.write(entry)

    def get_context_prompt(self) -> str:
        """Get scratchpad as context for agent."""
        notes = self.read()
        if not notes:
            return ""
        return f"""
## Previous Notes (from earlier iterations)

{notes}

---
Continue from where you left off.
"""

# Usage in Ralph Loop
scratchpad = AgentScratchpad(task.id)

for iteration in range(max_iterations):
    context = scratchpad.get_context_prompt()
    prompt = f"{context}\n\n{task.description}"

    result = await agent.run(prompt)

    # Agent's notes persist for next iteration
    scratchpad.append(result.notes)

Effort: Low

Verdict: YES - Simple and effective.


4. Security Masking in Logs

What it is: Automatically redact API keys and secrets from logs.

Current BLACKICE approach: Hope secrets aren't logged.

Why adopt: Defense in depth. Logs are often exposed.

Implementation sketch:

import re

class SecretMasker:
    """Mask secrets in log output."""

    PATTERNS = [
        (r"sk-[a-zA-Z0-9]{48}", "sk-***REDACTED***"),  # OpenAI
        (r"sk-ant-[a-zA-Z0-9-]{95}", "sk-ant-***REDACTED***"),  # Anthropic
        (r"AKIA[A-Z0-9]{16}", "AKIA***REDACTED***"),  # AWS
        (r"ghp_[a-zA-Z0-9]{36}", "ghp_***REDACTED***"),  # GitHub
        (r"password\s*[:=]\s*\S+", "password: ***REDACTED***"),
        (r"token\s*[:=]\s*\S+", "token: ***REDACTED***"),
    ]

    def mask(self, text: str) -> str:
        for pattern, replacement in self.PATTERNS:
            text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
        return text

# Integrate with logging
class MaskedLogger:
    def __init__(self, masker: SecretMasker):
        self.masker = masker

    def info(self, msg: str, **kwargs):
        masked_msg = self.masker.mask(msg)
        masked_kwargs = {k: self.masker.mask(str(v)) for k, v in kwargs.items()}
        logger.info(masked_msg, **masked_kwargs)

Effort: Low

Verdict: YES - Essential for production.


5. Output Formatter Abstraction

What it is: Separate output formatting from orchestration logic.

Current BLACKICE approach: CLI output only.

Why adopt: Same orchestration → different outputs (console, JSON, dashboard).

Implementation sketch:

from abc import ABC, abstractmethod

class OutputFormatter(ABC):
    @abstractmethod
    def task_started(self, task: Task): pass

    @abstractmethod
    def iteration_complete(self, iteration: int, result: IterationResult): pass

    @abstractmethod
    def task_complete(self, result: TaskResult): pass

class ConsoleFormatter(OutputFormatter):
    def task_started(self, task: Task):
        print(f"🚀 Starting: {task.name}")

    def iteration_complete(self, iteration: int, result: IterationResult):
        status = "✅" if result.success else "⏳"
        print(f"  {status} Iteration {iteration}: {result.summary}")

    def task_complete(self, result: TaskResult):
        print(f"🏁 Complete: {result.status}")

class JSONFormatter(OutputFormatter):
    def task_started(self, task: Task):
        print(json.dumps({"event": "started", "task": task.id}))

    def iteration_complete(self, iteration: int, result: IterationResult):
        print(json.dumps({"event": "iteration", "n": iteration, "success": result.success}))

    def task_complete(self, result: TaskResult):
        print(json.dumps({"event": "complete", "result": result.__dict__}))

class WebSocketFormatter(OutputFormatter):
    def __init__(self, ws: WebSocket):
        self.ws = ws

    async def task_started(self, task: Task):
        await self.ws.send_json({"event": "started", "task": task.id})

Effort: Low

Verdict: YES - Clean separation of concerns.


Ideas NOT Worth Adopting

TypeScript Core

Why skip: BLACKICE is Python. Don't fragment the codebase.

ACP Adapter Complexity

Why skip: BLACKICE's simpler adapter pattern is sufficient.


Summary

Feature Worth Adopting? Effort Priority
Completion Marker Detection YES Low High
Security Masking YES Low High
Scratchpad Persistence YES Low Medium
Adaptive Permissions YES Medium Medium
Output Formatter Abstraction YES Low Low

References


<!-- Source Gist 18 of 19: 4442ce070009cc6674820a517b64a8a3 -->

Oh-My-OpenCode Ideas for BLACKICE

Oh-My-OpenCode Ideas for BLACKICE

Ideas from Oh-My-OpenCode ("Sisyphus") for BLACKICE.

What is Oh-My-OpenCode?

A plugin harness for OpenCode enabling coordinated multi-agent workflows with specialized agents and curated tools.

Aspect Oh-My-OpenCode BLACKICE
Focus Agent orchestration with role-based models Iterate-until-success with consensus
Platform OpenCode plugin Python CLI
Agents Oracle, Frontend Engineer, Librarian, Explorer Supervisor, Consensus, Workers
Model Routing Role-based (GPT for strategy, Gemini for visual) LLMRouter based on task type

Key Features

  1. Role-Based Model Assignment - Different models for different tasks
  2. LSP/AST-Driven Tools - Surgical refactoring, not naive text manipulation
  3. Background Agent Delegation - Reduce main agent context overhead
  4. Todo-Driven Enforcement - Force continuation if agents quit halfway
  5. MCP Integration - External tools without bloating prompts

Ideas Worth Adopting

1. Role-Based Model Assignment

What it is: Assign models by purpose, not just "pick the smartest."

Current BLACKICE approach: LLMRouter selects based on task complexity.

Why adopt: Different models excel at different things. Claude for architecture, GPT for strategic thinking, Ollama for fast iteration.

Implementation sketch:

@dataclass
class AgentRole:
    name: str
    purpose: str
    preferred_model: str
    fallback_models: list[str]

AGENT_ROLES = {
    "architect": AgentRole(
        name="Architect",
        purpose="System design and high-level decisions",
        preferred_model="claude-opus-4-5-20251101",
        fallback_models=["gpt-4o", "claude-sonnet-4-20250514"]
    ),
    "implementer": AgentRole(
        name="Implementer",
        purpose="Write and modify code",
        preferred_model="claude-sonnet-4-20250514",
        fallback_models=["ollama/qwen2.5-coder:32b"]
    ),
    "reviewer": AgentRole(
        name="Reviewer",
        purpose="Code review and security audit",
        preferred_model="gpt-4o",
        fallback_models=["claude-sonnet-4-20250514"]
    ),
    "explorer": AgentRole(
        name="Explorer",
        purpose="Codebase analysis and documentation",
        preferred_model="ollama/qwen2.5-coder:7b",  # Fast, cheap
        fallback_models=["claude-sonnet-4-20250514"]
    ),
}

class RoleBasedRouter:
    async def route(self, task: Task, role: str) -> str:
        agent_role = AGENT_ROLES[role]
        for model in [agent_role.preferred_model] + agent_role.fallback_models:
            if await self.is_available(model):
                return model
        raise NoModelAvailable(f"No model available for role {role}")

Effort: Low - extends existing LLMRouter

Verdict: YES - More nuanced than simple complexity-based routing.


2. LSP/AST-Driven Refactoring

What it is: Use Language Server Protocol and AST tools for surgical code changes.

Current BLACKICE approach: Text-based code generation.

Why adopt: Deterministic, safer transformations. Don't break code with regex.

Implementation sketch:

from ast_grep_py import SgRoot

class ASTRefactorer:
    """Surgical code refactoring using AST patterns."""

    def rename_function(self, file_path: Path, old_name: str, new_name: str):
        """Rename function across file using AST."""
        code = file_path.read_text()
        root = SgRoot(code, "python")

        # Find all call sites
        pattern = f"$FUNC({old_name})"
        matches = root.find_all(pattern)

        # Replace with new name
        for match in matches:
            # Safe replacement preserving structure
            pass

    def extract_method(self, file_path: Path, start_line: int, end_line: int, new_name: str):
        """Extract lines into new method with proper imports."""
        # Use LSP to find dependencies
        # Generate method with correct signature
        pass

class LSPClient:
    """Language Server Protocol client for code intelligence."""

    async def find_references(self, file: Path, line: int, col: int) -> list[Location]:
        """Find all references to symbol."""
        pass

    async def get_definition(self, file: Path, line: int, col: int) -> Location:
        """Jump to definition."""
        pass

    async def rename_symbol(self, file: Path, line: int, col: int, new_name: str) -> list[Edit]:
        """Rename symbol across project."""
        pass

Effort: Medium-High - requires LSP infrastructure

Verdict: YES - Essential for reliable refactoring.


3. Background Agent Delegation

What it is: Spawn cheap agents to process raw data, main agent works with summaries.

Current BLACKICE approach: Single agent processes everything.

Why adopt: Reduce context consumption. Main agent stays focused.

Implementation sketch:

class BackgroundDelegator:
    """Delegate heavy processing to background agents."""

    async def digest_codebase(self, paths: list[Path]) -> str:
        """Have background agent summarize codebase."""
        # Spawn cheap Ollama agent
        background_agent = Agent(
            model="ollama/qwen2.5-coder:7b",
            purpose="Summarize code files"
        )

        summaries = []
        for path in paths:
            code = path.read_text()
            summary = await background_agent.run(
                f"Summarize this file in 2-3 sentences:\n{code}"
            )
            summaries.append(f"## {path}\n{summary}")

        return "\n\n".join(summaries)

    async def research_topic(self, topic: str) -> str:
        """Have background agent do web research."""
        research_agent = Agent(
            model="ollama/qwen2.5:7b",
            tools=["web_search", "fetch_url"]
        )

        findings = await research_agent.run(
            f"Research {topic} and provide a summary with key points."
        )
        return findings

# Main agent usage
async def solve_task(task: Task):
    # Background agent digests codebase
    codebase_summary = await delegator.digest_codebase(task.relevant_files)

    # Main agent works with summary, not raw code
    main_agent = Agent(model="claude-sonnet-4-20250514")
    result = await main_agent.run(
        f"Task: {task.description}\n\nCodebase context:\n{codebase_summary}"
    )

Effort: Medium - new delegation pattern

Verdict: YES - Token efficiency is critical for long tasks.


4. Continuation Enforcement

What it is: Force agents to continue if they quit halfway.

Current BLACKICE approach: Ralph Loop retries on failure, but not on premature quit.

Why adopt: Agents sometimes give up too early. Force completion.

Implementation sketch:

class ContinuationEnforcer:
    """Ensure agents complete their work."""

    QUIT_PATTERNS = [
        "I cannot complete this",
        "This is beyond my capabilities",
        "I'll stop here",
        "Let me know if you need",
    ]

    def detect_premature_quit(self, response: str) -> bool:
        """Check if agent quit prematurely."""
        for pattern in self.QUIT_PATTERNS:
            if pattern.lower() in response.lower():
                return True
        return False

    async def enforce_continuation(self, agent: Agent, task: Task, response: str) -> str:
        """If agent quit, push them to continue."""
        if not self.detect_premature_quit(response):
            return response

        continuation_prompt = """
You stopped before completing the task. This is not acceptable.

Original task: {task.description}

Your incomplete response ended with: "{response[-200:]}"

Continue from where you left off. Complete the task fully.
Do not apologize. Do not explain limitations. Just do the work.
"""
        return await agent.run(continuation_prompt.format(
            task=task, response=response
        ))

Effort: Low

Verdict: YES - Prevents wasted iterations.


5. Multimodal Context Compression

What it is: Use screenshots instead of raw code for UI work.

Current BLACKICE approach: Text-only context.

Why adopt: Screenshots can convey layout faster than code. Saves tokens.

Implementation sketch:

import subprocess
from pathlib import Path

class MultimodalContext:
    """Use images to reduce text context."""

    async def capture_ui_state(self, url: str) -> Path:
        """Capture screenshot of web UI."""
        screenshot_path = Path(f"/tmp/ui-{uuid4()}.png")
        subprocess.run([
            "playwright", "screenshot", url, str(screenshot_path)
        ])
        return screenshot_path

    async def capture_terminal(self) -> Path:
        """Capture terminal output as image."""
        screenshot_path = Path(f"/tmp/term-{uuid4()}.png")
        subprocess.run(["screencapture", "-l", str(screenshot_path)])
        return screenshot_path

    async def analyze_with_vision(self, image: Path, question: str) -> str:
        """Use vision model to analyze image."""
        agent = Agent(model="gpt-4o")  # Vision-capable
        return await agent.run_with_image(image, question)

Effort: Low-Medium

Verdict: MAYBE - Useful for UI work, not general coding.


Ideas NOT Worth Adopting

OpenCode Lock-in

Why skip: BLACKICE should remain framework-agnostic.

Complex Plugin Architecture

Why skip: BLACKICE's skill system is simpler and sufficient.


Summary

Feature Worth Adopting? Effort Priority
Continuation Enforcement YES Low High
Role-Based Model Assignment YES Low High
Background Agent Delegation YES Medium Medium
LSP/AST Refactoring YES High Medium
Multimodal Context MAYBE Low Low

References


<!-- Source Gist 19 of 19: 9569ccc3aa932d75f19d702b9d945f4c -->

BLACKICE - Complete System Documentation (Context Drop)

BLACKICE - Complete System Documentation

A comprehensive context drop for the BLACKICE autonomous software development system.

Last Updated: January 2026
Total Lines: 77,113 Python (source + tests)
Repository: github.com/jmanhype/blackice


Table of Contents

  1. What Is BLACKICE?
  2. Core Philosophy
  3. Architecture Overview
  4. Key Components
  5. File Structure
  6. Data Models
  7. Execution Flow
  8. Configuration
  9. Deployment
  10. API Reference
  11. Comparison to Alternatives
  12. Future Work

What Is BLACKICE?

BLACKICE is a Ralph Loop implementation with multi-agent consensus, crash recovery, and enterprise observability.

One sentence: You describe a task → BLACKICE coordinates multiple LLM agents → they iterate with self-reflection until success → working code is delivered.

The Name

Term Meaning
BLACKICE Project/repo name
Ralph Loop Core pattern: iterate until success with learning
EnterpriseFlywheel Main orchestrator class (186KB, 4500+ lines)
Service Colony Academic foundation (arXiv:2407.07267)

What It Does

Input:  "Write a REST API for user authentication"
        ↓
BLACKICE:
  1. Routes task to best LLM (Claude/Ollama/Letta)
  2. Spins up agents in isolated git worktrees
  3. Agents propose solutions
  4. Consensus voting selects best approach
  5. SafetyGuard prevents infinite loops
  6. CostTracker enforces token/time budgets
  7. Beads logs everything for crash recovery
  8. Reflexion learns from failures
  9. Iterate until validation passes
        ↓
Output: Working code, committed to repo

Core Philosophy

The Ralph Pattern

From ghuntley.com/ralph:

"Keep trying different approaches until you succeed, learning from each failure."

┌─────────────────────────────────────────────┐
│                 RALPH LOOP                   │
│                                              │
│   ┌──────┐    ┌──────┐    ┌──────────┐      │
│   │ TRY  │───▶│ FAIL │───▶│ REFLECT  │      │
│   └──────┘    └──────┘    └────┬─────┘      │
│       ▲                        │            │
│       │                        ▼            │
│       │                 ┌──────────┐        │
│       └─────────────────│  LEARN   │        │
│                         └──────────┘        │
│                                              │
│   Until: SUCCESS or MAX_ITERATIONS          │
└─────────────────────────────────────────────┘

Multi-Agent Consensus

Unlike single-agent systems, BLACKICE uses multiple agents voting on solutions:

Strategy Description Use Case
majority >50% approval Default for most tasks
supermajority >66% approval Critical changes
unanimous 100% approval Security-sensitive
quorum Minimum voters required Large agent pools
first_n First N approvals win Fast iteration
weighted Weighted by agent expertise Specialist tasks

Crash Recovery

All state is persisted to Beads (append-only SQLite event store):

Agent crashes mid-task
        ↓
RecoveryManager reads Beads
        ↓
Reconstructs state from events
        ↓
New agent continues from last checkpoint
        ↓
Task completes successfully

Architecture Overview

12-Layer Stack

┌─────────────────────────────────────────────────────────────────┐
│ Layer 11: CLI                                                    │
│           ralph_cli.py - Command-line interface                  │
├─────────────────────────────────────────────────────────────────┤
│ Layer 10: Orchestrator                                           │
│           orchestrator.py - High-level task coordination         │
├─────────────────────────────────────────────────────────────────┤
│ Layer 9:  EnterpriseFlywheel                                     │
│           enterprise_flywheel.py - Unified orchestrator (186KB)  │
│           Integrates ALL capabilities into single entry point    │
├─────────────────────────────────────────────────────────────────┤
│ Layer 8:  Reflexion                                              │
│           reflexion.py - Self-improvement loop                   │
│           Multi-dimensional quality scoring                      │
├─────────────────────────────────────────────────────────────────┤
│ Layer 7:  Recovery                                               │
│           recovery_manager.py - Crash recovery from Beads        │
│           dead_letter_queue.py - Failed task handling            │
│           worktree_pool.py - Git worktree isolation              │
├─────────────────────────────────────────────────────────────────┤
│ Layer 6:  Persistence                                            │
│           beads.py - Append-only SQLite event store (40+ types)  │
│           semantic_memory.py - Letta embeddings                  │
│           artifact_store.py - S3/MinIO storage                   │
├─────────────────────────────────────────────────────────────────┤
│ Layer 5:  Instrumentation                                        │
│           tracer.py - OpenTelemetry distributed tracing          │
│           metrics.py - Prometheus counters/histograms            │
│           logger.py - Structured JSON logging (structlog)        │
│           safety_guard.py - Policy enforcement, loop detection   │
│           cost_tracker.py - Token/time budget management         │
├─────────────────────────────────────────────────────────────────┤
│ Layer 4:  Service Colony                                         │
│           agents/supervisor.py - Task decomposition              │
│           agents/consensus.py - 6 voting strategies              │
│           agents/mail.py - Inter-agent messaging                 │
│           agents/registry.py - Capability-based routing          │
├─────────────────────────────────────────────────────────────────┤
│ Layer 3:  Core Loop                                              │
│           loop.py - Ralph iterate-until-success pattern          │
│           retry.py - Exponential backoff with jitter             │
│           cancellation.py - Timeout and cancellation             │
├─────────────────────────────────────────────────────────────────┤
│ Layer 2:  Adapters                                               │
│           adapters/claude_code.py - Claude Code CLI              │
│           adapters/claude_proxy.py - Claude via AI Factory       │
│           adapters/ollama.py - Local Ollama inference            │
│           adapters/letta.py - Persistent memory agents           │
│           adapters/codex.py - OpenAI Codex                       │
├─────────────────────────────────────────────────────────────────┤
│ Layer 1:  Dispatcher                                             │
│           dispatcher.py - Route to ai-factory/speckit/LLM        │
│           router.py - Smart model selection                      │
├─────────────────────────────────────────────────────────────────┤
│ Layer 0:  Infrastructure                                         │
│           ai-factory/ - Docker Compose (Ollama, Letta, Postgres) │
└─────────────────────────────────────────────────────────────────┘

Component Diagram

                    ┌─────────────────────────────────────┐
                    │         EnterpriseFlywheel          │
                    │         (Unified Orchestrator)       │
                    └─────────────────┬───────────────────┘
                                      │
        ┌─────────────────────────────┼─────────────────────────────┐
        │                             │                             │
        ▼                             ▼                             ▼
┌───────────────┐           ┌─────────────────┐           ┌─────────────────┐
│  SafetyGuard  │           │   CostTracker   │           │   SmartRouter   │
│  - Policies   │           │  - Token budget │           │ - Model select  │
│  - Loop detect│           │  - Time budget  │           │ - Capability    │
└───────────────┘           └─────────────────┘           └─────────────────┘
        │                             │                             │
        └─────────────────────────────┼─────────────────────────────┘
                                      │
                                      ▼
                    ┌─────────────────────────────────────┐
                    │           DAGExecutor               │
                    │     (Parallel Workflow Engine)      │
                    └─────────────────┬───────────────────┘
                                      │
        ┌─────────────────────────────┼─────────────────────────────┐
        │                             │                             │
        ▼                             ▼                             ▼
┌───────────────┐           ┌─────────────────┐           ┌─────────────────┐
│ WorktreePool  │           │    Consensus    │           │   AgentMail     │
│ - Git isolate │           │  - 6 strategies │           │ - Request/reply │
│ - Per task    │           │  - Vote collect │           │ - Broadcast     │
└───────────────┘           └─────────────────┘           └─────────────────┘
        │                             │                             │
        └─────────────────────────────┼─────────────────────────────┘
                                      │
                                      ▼
                    ┌─────────────────────────────────────┐
                    │           Adapters Layer            │
                    │  Claude │ Ollama │ Letta │ Codex    │
                    └─────────────────┬───────────────────┘
                                      │
        ┌─────────────────────────────┼─────────────────────────────┐
        │                             │                             │
        ▼                             ▼                             ▼
┌───────────────┐           ┌─────────────────┐           ┌─────────────────┐
│    Beads      │           │ RecoveryManager │           │  DeadLetterQ    │
│ - Event store │           │ - Crash recover │           │ - Failed tasks  │
│ - 40+ types   │           │ - State replay  │           │ - Retry policy  │
└───────────────┘           └─────────────────┘           └─────────────────┘

Key Components

1. EnterpriseFlywheel (enterprise_flywheel.py)

The unified orchestrator that brings everything together. 186KB, 4500+ lines.

from integrations.ralph.enterprise_flywheel import (
    EnterpriseFlywheel,
    EnterpriseFlywheelConfig,
    EnterpriseTask,
)

config = EnterpriseFlywheelConfig(
    ollama_url="http://192.168.1.143:11434",
    letta_url="http://192.168.1.143:8283",
    claude_proxy_url="http://192.168.1.143:42069",
    max_iterations=10,
    max_tokens_per_task=100_000,
)

flywheel = EnterpriseFlywheel(config)

task = EnterpriseTask(
    id="example",
    name="Code Generation",
    description="Write a function to calculate fibonacci",
)

result = await flywheel.execute_task(task)

Key capabilities:

  • LLMRouter for intelligent model selection
  • DAGExecutor for parallel workflow execution
  • WorktreePool for git worktree isolation per task
  • RecoveryManager for crash recovery from Beads events
  • DeadLetterQueue for failed task handling with retry
  • SafetyGuard for policy enforcement and loop detection
  • CostTracker for token/time budget management
  • LettaAdapter for persistent memory across sessions
  • ReflexionLoop for multi-dimensional quality scoring

2. Beads (beads.py)

Append-only SQLite event store with 40+ event types.

from integrations.ralph.beads import BeadsStore, EventType, EntityType

beads = BeadsStore(Path("~/.beads/beads.db"))

# Emit events
beads.emit(
    event_type=EventType.TASK_STARTED,
    entity_type=EntityType.TASK,
    entity_id="task-123",
    data={"description": "Write fibonacci"},
)

# Query events
events = beads.query_by_entity("task-123")

# Replay for recovery
for event in beads.replay_from(checkpoint_id):
    apply_event(event)

Event Types (40+):

  • Run: RUN_STARTED, RUN_COMPLETED, RUN_FAILED, RUN_STATE_TRANSITION
  • Task: TASK_QUEUED, TASK_STARTED, TASK_SUCCEEDED, TASK_FAILED, TASK_RETRY
  • Mail: MAIL_SENT, MAIL_DELIVERED, MAIL_ACKED, MAIL_EXPIRED
  • Workspace: GIT_CHECKPOINT_CREATED, WORKTREE_ACQUIRED, WORKTREE_RELEASED
  • Consensus: PROPOSAL_CREATED, VOTE_CAST, CONSENSUS_REACHED

3. Consensus (agents/consensus.py)

6 voting strategies for multi-agent decision making.

from integrations.ralph.agents.consensus import (
    ConsensusEngine,
    ConsensusStrategy,
    Proposal,
    VoteValue,
)

engine = ConsensusEngine(strategy=ConsensusStrategy.MAJORITY)

# Create proposal
proposal = engine.create_proposal(
    proposer="agent-1",
    content={"solution": "Use recursion"},
    timeout_seconds=60,
)

# Agents vote
engine.cast_vote(proposal.id, "agent-2", VoteValue.APPROVE, "Clean solution")
engine.cast_vote(proposal.id, "agent-3", VoteValue.APPROVE, "Efficient")
engine.cast_vote(proposal.id, "agent-4", VoteValue.REJECT, "Prefer iteration")

# Check result
result = engine.get_result(proposal.id)
# result.status = ProposalStatus.APPROVED (3 approve > 1 reject)

Strategies:

Strategy Rule
MAJORITY >50% approve
SUPERMAJORITY >66% approve
UNANIMOUS 100% approve
QUORUM Minimum N voters, then majority
FIRST_N First N approvals win
WEIGHTED Sum of weights, threshold

4. Adapters

Multi-LLM support - not locked to any single provider.

# Claude via AI Factory proxy
from integrations.ralph.adapters.claude_proxy import ClaudeProxyAdapter
claude = ClaudeProxyAdapter(url="http://192.168.1.143:42069")

# Local Ollama
from integrations.ralph.adapters.ollama import OllamaAdapter
ollama = OllamaAdapter(url="http://192.168.1.143:11434")

# Letta (persistent memory)
from integrations.ralph.adapters.letta import LettaAdapter
letta = LettaAdapter(url="http://192.168.1.143:8283")

# OpenAI Codex
from integrations.ralph.adapters.codex import CodexAdapter
codex = CodexAdapter(api_key="...")

Adapter interface:

class BaseAdapter:
    async def execute(self, prompt: str, **kwargs) -> AdapterResult:
        """Execute a prompt and return the result."""
        
    async def health_check(self) -> bool:
        """Check if the adapter is healthy."""
        
    def get_capabilities(self) -> list[str]:
        """Return list of capabilities (coding, reasoning, etc)."""

5. Instrumentation

Enterprise observability with OpenTelemetry, Prometheus, and structlog.

# Tracing
from integrations.ralph.instrumentation.tracer import RalphTracer

tracer = RalphTracer(service_name="blackice")
with tracer.span("execute_task", attributes={"task_id": "123"}):
    result = await run_task()

# Metrics
from integrations.ralph.instrumentation.metrics import RalphMetrics

metrics = RalphMetrics(port=9090)
metrics.task_started("task-123")
metrics.tokens_used(1500, model="claude-3-5-sonnet")
metrics.task_completed("task-123", duration_ms=5000)

# Structured Logging
from integrations.ralph.instrumentation.logger import get_logger

log = get_logger("ralph.loop")
log.info("task_started", task_id="123", model="claude")
# Output: {"event": "task_started", "task_id": "123", "model": "claude", "timestamp": "..."}

6. Safety & Cost

Policy enforcement and budget management.

# SafetyGuard
from integrations.ralph.instrumentation.safety_guard import SafetyGuard

guard = SafetyGuard(
    max_iterations=10,
    loop_detection_threshold=3,
    allowed_policies=["default"],
)

decision = guard.evaluate(checkpoint=Checkpoint.BEFORE_ITERATION, context={...})
if decision.action == SafetyAction.ABORT:
    raise SafetyViolation(decision.reason)

# CostTracker
from integrations.ralph.instrumentation.cost_tracker import CostTracker

tracker = CostTracker(
    max_tokens=100_000,
    max_time_seconds=600,
)

tracker.record_tokens("task-123", 1500)
if not tracker.can_continue("task-123"):
    raise BudgetExceeded()

File Structure

blackice/
├── integrations/ralph/              # THE SYSTEM (77K lines)
│   ├── __init__.py                  # Public API exports
│   ├── enterprise_flywheel.py       # Unified orchestrator (186KB)
│   ├── loop.py                      # Ralph loop core (68KB)
│   ├── beads.py                     # Event store (28KB)
│   ├── models.py                    # Data models (17KB)
│   │
│   ├── agents/                      # Service Colony patterns
│   │   ├── supervisor.py            # Task decomposition (61KB)
│   │   ├── consensus.py             # 6 voting strategies (24KB)
│   │   ├── mail.py                  # Inter-agent messaging (25KB)
│   │   └── registry.py              # Capability routing (29KB)
│   │
│   ├── adapters/                    # LLM backends
│   │   ├── base.py                  # Adapter interface
│   │   ├── claude_code.py           # Claude Code CLI
│   │   ├── claude_proxy.py          # Claude via AI Factory
│   │   ├── ollama.py                # Local Ollama
│   │   ├── letta.py                 # Persistent memory
│   │   └── codex.py                 # OpenAI Codex
│   │
│   ├── instrumentation/             # Observability (Layer 5)
│   │   ├── tracer.py                # OpenTelemetry (10KB)
│   │   ├── metrics.py               # Prometheus (15KB)
│   │   ├── logger.py                # Structlog (10KB)
│   │   ├── safety_guard.py          # Policy enforcement (6KB)
│   │   ├── cost_tracker.py          # Budget management (3KB)
│   │   └── fingerprint.py           # Loop detection (3KB)
│   │
│   ├── recovery_manager.py          # Crash recovery (13KB)
│   ├── dead_letter_queue.py         # Failed task handling (13KB)
│   ├── worktree_pool.py             # Git isolation (12KB)
│   ├── dag_executor.py              # Parallel workflows (31KB)
│   ├── worker_pool.py               # Agent workers (26KB)
│   ├── reflexion.py                 # Self-improvement (23KB)
│   ├── semantic_memory.py           # Letta embeddings (22KB)
│   ├── retry.py                     # Exponential backoff (19KB)
│   ├── agent_mail.py                # Messaging (27KB)
│   ├── artifact_store.py            # S3 storage (17KB)
│   ├── git_checkpoint.py            # Git checkpoints (20KB)
│   ├── cancellation.py              # Timeouts (21KB)
│   │
│   ├── cli/                         # CLI interface
│   │   └── ...
│   │
│   ├── tests/                       # Test suite (33 files)
│   │   ├── test_enterprise_flywheel.py
│   │   ├── test_consensus.py
│   │   ├── test_beads.py
│   │   └── ... (30 more)
│   │
│   └── config/                      # Configuration
│       └── ...
│
├── ai-factory/                      # Docker infrastructure (submodule)
│   ├── docker-compose.yml           # Container definitions
│   ├── litellm-config.yaml          # LLM routing
│   └── llmrouter/                   # Model selection service
│
├── specs/service-colony/            # SpecKit documentation
│   ├── spec.md                      # WHAT/WHY
│   ├── plan.md                      # HOW
│   ├── tasks.md                     # 51 implementation tasks
│   ├── checklist.md                 # Quality gates (73%)
│   ├── deployment.md                # AI Factory setup
│   ├── troubleshooting.md           # Common issues
│   └── whitepaper.md                # Technical white paper
│
└── .claude/skills/                  # Claude Code skills

Data Models

Core Types

class RunState(str, Enum):
    """State machine for task execution."""
    INIT = "init"
    PLANNING = "planning"
    RUNNING = "running"
    CHECKPOINTING = "checkpointing"
    SUCCEEDED = "succeeded"
    FAILED = "failed"
    ABORTED = "aborted"
    PAUSED = "paused"
    RESUMING = "resuming"
    ROLLING_BACK = "rolling_back"

class TaskType(str, Enum):
    """Task classification for routing."""
    CODING = "coding"
    PLANNING = "planning"
    REASONING = "reasoning"
    GENERATION = "generation"
    VALIDATION = "validation"

class AttemptOutcome(str, Enum):
    """Result of an execution attempt."""
    SUCCESS = "success"
    FAILURE = "failure"
    ERROR = "error"
    TIMEOUT = "timeout"

@dataclass
class LoopConfig:
    """Configuration for Ralph Loop."""
    max_iterations: int = 100
    memory_agent_id: str = "agent-xxx"
    default_model: str = "qwen2.5-coder:7b"
    validation_timeout: int = 30
    refinement_mode: Literal["auto", "manual", "disabled"] = "auto"

@dataclass
class EnterpriseTask:
    """A task to be executed by the flywheel."""
    id: str
    name: str
    description: str
    task_type: TaskType = TaskType.CODING
    priority: int = 5
    dependencies: list[str] = field(default_factory=list)
    metadata: dict[str, Any] = field(default_factory=dict)

Event Types

class EventType(str, Enum):
    """40+ event types for Beads."""
    # Run lifecycle
    RUN_STARTED = "run_started"
    RUN_STATE_TRANSITION = "run_state_transition"
    RUN_COMPLETED = "run_completed"
    RUN_FAILED = "run_failed"
    
    # Task lifecycle
    TASK_QUEUED = "task_queued"
    TASK_STARTED = "task_started"
    TASK_SUCCEEDED = "task_succeeded"
    TASK_FAILED = "task_failed"
    TASK_RETRY = "task_retry"
    
    # Consensus
    PROPOSAL_CREATED = "proposal_created"
    VOTE_CAST = "vote_cast"
    CONSENSUS_REACHED = "consensus_reached"
    
    # Workspace
    GIT_CHECKPOINT_CREATED = "git_checkpoint_created"
    WORKTREE_ACQUIRED = "worktree_acquired"
    WORKTREE_RELEASED = "worktree_released"
    
    # ... 30+ more

Execution Flow

Happy Path

1. CLI receives task
   └── ralph run "Write fibonacci function"

2. EnterpriseFlywheel.execute_task()
   ├── SafetyGuard.evaluate(START_OF_RUN)
   ├── CostTracker.start_tracking()
   └── WorktreePool.acquire()

3. SmartRouter.select_model()
   ├── Analyze task type (coding)
   ├── Check adapter health
   └── Return: "claude-3-5-sonnet"

4. DAGExecutor.execute()
   ├── Create execution graph
   └── Run nodes in parallel where possible

5. For each iteration:
   ├── SafetyGuard.evaluate(BEFORE_ITERATION)
   ├── Adapter.execute(prompt)
   ├── Beads.emit(TASK_PROGRESS)
   ├── Validator.validate(result)
   └── If failed: Reflexion.reflect() → refine prompt

6. Consensus (if multi-agent):
   ├── ConsensusEngine.create_proposal()
   ├── Agents cast votes
   └── ConsensusEngine.get_result()

7. On success:
   ├── Beads.emit(TASK_SUCCEEDED)
   ├── WorktreePool.release()
   ├── SemanticMemory.store_success()
   └── Return FlywheelResult

8. On failure:
   ├── Beads.emit(TASK_FAILED)
   ├── DeadLetterQueue.enqueue()
   └── Return error

Recovery Flow

1. Agent crashes mid-task

2. New agent starts
   └── RecoveryManager.create_recovery_plan()

3. RecoveryManager:
   ├── Query Beads for last run
   ├── Find last checkpoint
   └── Build recovery plan

4. EnterpriseFlywheel.recover()
   ├── For completed subtasks: skip
   ├── For pending subtasks: execute
   └── For failed subtasks: retry or DLQ

5. Continue from checkpoint
   └── Task completes

Configuration

Environment Variables

# Adapters
RALPH_CLAUDE_PROXY_URL=http://192.168.1.143:42069
RALPH_OLLAMA_URL=http://192.168.1.143:11434
RALPH_LETTA_URL=http://192.168.1.143:8283

# Limits
RALPH_MAX_ITERATIONS=100
RALPH_MAX_TOKENS=100000
RALPH_MAX_TIME_SECONDS=600

# Safety
RALPH_LOOP_DETECTION_THRESHOLD=3
RALPH_ALLOWED_POLICIES=default

# Memory
RALPH_MEMORY_AGENT_ID=agent-xxx
RALPH_BEADS_PATH=~/.beads/beads.db

# Observability
RALPH_TRACING_ENABLED=true
RALPH_METRICS_PORT=9090
RALPH_LOG_LEVEL=INFO
RALPH_LOG_JSON=true

YAML Config (~/.ralph/config.yaml)

adapters:
  claude_proxy:
    url: "http://192.168.1.143:42069"
    default_model: "claude-3-5-haiku-20241022"
    timeout: 120
  ollama:
    url: "http://192.168.1.143:11434"
    default_model: "qwen2.5-coder:7b"
  letta:
    url: "http://192.168.1.143:8283"
    agent_id: "agent-xxx"

safety:
  max_iterations: 10
  loop_detection_threshold: 3
  allowed_policies:
    - default

cost:
  max_tokens_per_task: 100000
  max_time_per_task_seconds: 600

observability:
  tracing_enabled: true
  tracing_console_export: true
  metrics_enabled: true
  metrics_port: 9090
  logging_enabled: true
  logging_json: true

worktree:
  base_path: /tmp/ralph-worktrees
  max_pool_size: 10

Deployment

AI Factory (GPU Server)

cd ai-factory
cp .env.template .env
# Edit .env with API keys

docker compose up -d

# Services:
# - postgres-vector:5432   (vector database)
# - ollama:11434           (local LLM inference)
# - letta-server:8283      (persistent memory agents)
# - litellm:4000           (multi-provider proxy)
# - llmrouter:4001         (intelligent model selection)

# Pull models
ollama pull qwen2.5-coder:7b

Client (Workstation)

# Install dependencies
pip install -e integrations/ralph

# Configure
mkdir -p ~/.ralph
cp config.example.yaml ~/.ralph/config.yaml
# Edit with your endpoints

# Run a task
python -m integrations.ralph.cli run "Write hello world in Python"

# Check status
python -m integrations.ralph.cli status

# View dashboard
python -m integrations.ralph.cli dashboard

Health Check

# Check all services
curl -s http://192.168.1.143:11434/api/tags && echo "Ollama OK"
curl -s http://192.168.1.143:8283/v1/health && echo "Letta OK"
curl -s http://192.168.1.143:42069/v1/models && echo "Claude Proxy OK"

API Reference

CLI Commands

# Run a task
ralph run "Write a REST API for users"
ralph run --model claude "Complex reasoning task"
ralph run --parallel 3 "Generate test cases"

# Status and monitoring
ralph status                    # Current task status
ralph dashboard                 # TUI monitoring
ralph logs --tail 100           # Recent logs

# Dead Letter Queue
ralph dlq list                  # View failed tasks
ralph dlq retry <task_id>       # Retry a task
ralph dlq purge --expired       # Clean up

# Recovery
ralph recover                   # Recover from crash
ralph recover --from-checkpoint <id>

Python API

from integrations.ralph import (
    EnterpriseFlywheel,
    EnterpriseFlywheelConfig,
    EnterpriseTask,
    TaskType,
)

# Initialize
config = EnterpriseFlywheelConfig(...)
flywheel = EnterpriseFlywheel(config)

# Execute task
task = EnterpriseTask(
    id="task-1",
    name="Generate Code",
    description="Write a fibonacci function",
    task_type=TaskType.CODING,
)
result = await flywheel.execute_task(task)

# Check result
if result.success:
    print(f"Output: {result.output}")
    print(f"Files: {result.files_changed}")
else:
    print(f"Failed: {result.error}")

# Recovery
plan = await flywheel.recover()
print(f"Recovered {len(plan.completed)} tasks")

Comparison to Alternatives

vs Gas Town (Steve Yegge)

Aspect BLACKICE Gas Town
Core Pattern Ralph Loop MEOW (molecular workflows)
Language Python (77K) Go (75K)
Decision Making 6 consensus strategies Mayor decides
LLM Support Claude, Ollama, Letta, Codex Claude Code only
Observability OpenTelemetry, Prometheus Activity feeds
UI CLI tmux (visual)
Recovery Beads event replay GUPP + hooks

vs Dapr Agents

Aspect BLACKICE Dapr Agents
Target Single GPU server Kubernetes scale
Consensus 6 strategies 3 modes
State SQLite (Beads) Pluggable stores
Complexity Simpler deployment Sidecar + control plane
Maturity Custom code CNCF graduated

vs CrewAI / AutoGen

Aspect BLACKICE CrewAI / AutoGen
Focus Code generation General agents
Consensus Built-in voting None
Recovery Beads event store None
Observability Full stack Basic logging
Git Integration Worktree isolation None

Future Work

Ideas from Gas Town Worth Adopting

Feature Priority Effort
Convoys (work bundling) High Low
GUPP (hook propulsion) High Medium
Patrol Agents (self-healing) High Medium
MEOW (workflow DSL) Medium High
tmux UI Low Medium

Roadmap

  1. Convoys - Bundle related tasks for tracking
  2. GUPP - Simplify crash recovery with hooks
  3. Patrol Agents - Self-healing background workers
  4. MEOW - Workflow DSL (Formulas → Molecules)
  5. Federation - Remote workers on cloud
  6. Web UI - Visual dashboard

References


Generated: January 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment