Skip to content

Instantly share code, notes, and snippets.

@V1ki
Created April 20, 2026 11:33
Show Gist options
  • Select an option

  • Save V1ki/fce168124d479ad43752ef11d19677d6 to your computer and use it in GitHub Desktop.

Select an option

Save V1ki/fce168124d479ad43752ef11d19677d6 to your computer and use it in GitHub Desktop.
mini-agent-runtime-ts: public event-driven agent runtime prototype with qwen controller

mini-agent-runtime-ts

A minimal event-driven agent runtime built with TypeScript + Bun.

What it demonstrates

  • non-blocking process runtime (start -> stream -> cancel)
  • event bus + JSONL event log
  • tool registry + base tool abstraction
  • local process tool and SSH command tool
  • LLM controller integration for long-running tasks
  • streaming decision loop over tool events

Security note

This public gist contains no API keys and no local vault integration details. To use the real Qwen controller, provide your own environment variable at runtime:

DASHSCOPE_API_KEY=... bun run src/index.ts demo qwen
DASHSCOPE_API_KEY=... bun run src/index.ts comfy-check qwen

Files included

Only the minimal prototype source is included. Runtime logs and local machine artifacts are intentionally excluded.

import { EventBus } from './event-bus';
import { ProcessRuntime } from './process-runtime';
import { ToolRegistry } from './tool-registry';
import { SshCommandTool } from './ssh-command-tool';
import { LocalProcessTool } from './local-process-tool';
import { summarizeRecentEvents } from './summarizer';
import { FakeController } from './fake-controller';
import { QwenController } from './qwen-controller';
import { defaultConfig, type RuntimeConfig } from './runtime-config';
import type { Event, RunHandle } from './types';
import type { ControllerDecision } from './controller-types';
const now = () => Date.now();
type ControllerInput = {
goal: string;
availableTools: string[];
recentEvents: Array<Record<string, unknown>>;
activeRunId?: string | null;
hasStarted?: boolean;
};
type ControllerLike = {
decide(input: ControllerInput): Promise<ControllerDecision>;
fastDecide?: (input: ControllerInput) => Promise<ControllerDecision>;
};
function isUrgentSignal(event: Event) {
if (event.type !== 'tool.stdout' && event.type !== 'tool.stderr') return false;
return /Traceback:|ImportError:|ModuleNotFoundError|HTTP 429|still retrying/i.test(event.chunk);
}
export class MiniAgent {
#bus: EventBus;
#runtime: ProcessRuntime;
#tools: ToolRegistry;
#controller: ControllerLike;
#active: RunHandle | null = null;
#done = false;
#goal = '';
#deciding = false;
#hasStarted = false;
#stepQueued = false;
constructor(bus: EventBus, runtime: ProcessRuntime, tools: ToolRegistry, controller: ControllerLike) {
this.#bus = bus;
this.#runtime = runtime;
this.#tools = tools;
this.#controller = controller;
}
static withDefaultTools(
bus: EventBus,
runtime: ProcessRuntime,
options?: {
controller?: 'fake' | 'qwen';
config?: RuntimeConfig;
secretResolver?: (ref: string) => string | undefined;
},
) {
const registry = new ToolRegistry();
registry.register(new SshCommandTool());
registry.register(new LocalProcessTool());
const config = options?.config ?? defaultConfig;
const controller =
options?.controller === 'qwen'
? new QwenController({
baseUrl: config.model.baseUrl,
apiKeyRef: config.model.apiKeyRef,
model: config.model.model,
secretResolver: options.secretResolver,
})
: new FakeController();
return new MiniAgent(bus, runtime, registry, controller);
}
#makeControllerInput(): ControllerInput {
return {
goal: this.#goal,
availableTools: this.#tools.list().map((t) => t.name),
recentEvents: summarizeRecentEvents(this.#bus.history),
activeRunId: this.#active?.runId ?? null,
hasStarted: this.#hasStarted,
};
}
async start(goal: string) {
this.#goal = goal;
await this.#step(false);
}
async #step(fast: boolean) {
if (this.#done) return;
if (this.#deciding) {
this.#stepQueued = true;
return;
}
this.#deciding = true;
try {
const input = this.#makeControllerInput();
const decision = fast && this.#controller.fastDecide
? await this.#controller.fastDecide(input)
: await this.#controller.decide(input);
await this.#apply(decision);
} finally {
this.#deciding = false;
if (this.#stepQueued && !this.#done) {
this.#stepQueued = false;
queueMicrotask(() => {
void this.#step(false);
});
}
}
}
async #apply(decision: ControllerDecision) {
if (decision.type === 'wait') {
this.#bus.publish({ type: 'agent.thought', runId: 'agent', ts: now(), message: decision.rationale ?? 'waiting' });
return;
}
if (decision.type === 'finish') {
this.#done = true;
this.#bus.publish({ type: 'agent.final', runId: 'agent', ts: now(), message: decision.message });
if (this.#active) await this.#active.cancel();
return;
}
if (decision.type === 'cancel_run') {
if (this.#active?.runId === decision.runId) {
this.#bus.publish({ type: 'agent.action', runId: 'agent', ts: now(), action: 'cancel', detail: decision.rationale });
await this.#active.cancel();
this.#done = true;
this.#bus.publish({ type: 'agent.final', runId: 'agent', ts: now(), message: decision.rationale ?? 'cancelled' });
}
return;
}
if (decision.type === 'tool') {
this.#bus.publish({ type: 'agent.thought', runId: 'agent', ts: now(), message: decision.rationale ?? 'calling tools' });
for (const call of decision.calls) {
if (call.tool === 'ssh-command') {
const tool = this.#tools.get('ssh-command') as SshCommandTool;
const spec = await tool.execute({}, call.input as never);
const parsed = JSON.parse(spec.output) as { command: string; args: string[] };
this.#active = this.#runtime.start(parsed.command, parsed.args);
this.#hasStarted = true;
} else if (call.tool === 'local-process') {
const tool = this.#tools.get('local-process') as LocalProcessTool;
const spec = await tool.execute({}, call.input as never);
const parsed = JSON.parse(spec.output) as { command: string; args?: string[] };
this.#active = this.#runtime.start(parsed.command, parsed.args ?? []);
this.#hasStarted = true;
}
}
}
}
attach() {
this.#bus.on('*', async (event: Event) => {
console.log(JSON.stringify(event));
if (this.#done) return;
if (event.type === 'tool.exit') {
await this.#step(true);
return;
}
if (isUrgentSignal(event)) {
await this.#step(true);
return;
}
if (event.type === 'tool.stdout' || event.type === 'tool.stderr') {
if (/boot|Starting server|To see the GUI/i.test(event.chunk)) {
await this.#step(false);
}
}
});
}
}
export type ControllerRequest = {
goal: string;
availableTools: string[];
recentEvents: Array<Record<string, unknown>>;
activeRunId?: string | null;
hasStarted?: boolean;
};
export type ControllerProxyEnvelope = {
request: ControllerRequest;
};
export type AgentToolCall = {
tool: string;
input: Record<string, unknown>;
};
export type ControllerDecision =
| { type: 'tool'; calls: AgentToolCall[]; rationale?: string }
| { type: 'cancel_run'; runId: string; rationale?: string }
| { type: 'wait'; rationale?: string }
| { type: 'finish'; message: string };
import { EventEmitter } from 'node:events';
import type { Event } from './types';
import { EventLog } from './event-log';
export class EventBus {
#history: Event[] = [];
#emitter = new EventEmitter();
#eventLog?: EventLog;
constructor(eventLog?: EventLog) {
this.#emitter.setMaxListeners(100);
this.#eventLog = eventLog;
}
publish(event: Event) {
this.#history.push(event);
this.#eventLog?.append(event);
this.#emitter.emit(event.type, event);
this.#emitter.emit('*', event);
}
on(type: Event['type'] | '*', listener: (event: Event) => void) {
this.#emitter.on(type, listener);
return () => this.#emitter.off(type, listener);
}
get history() {
return this.#history;
}
}
import { mkdirSync, appendFileSync } from 'node:fs';
import { dirname } from 'node:path';
import type { Event } from './types';
export class EventLog {
#path: string;
constructor(path: string) {
this.#path = path;
mkdirSync(dirname(path), { recursive: true });
}
append(event: Event) {
appendFileSync(this.#path, `${JSON.stringify(event)}\n`, 'utf8');
}
}
import type { ControllerDecision } from './controller-types';
export class FakeController {
async decide(input: {
goal: string;
availableTools: string[];
recentEvents: Array<Record<string, unknown>>;
activeRunId?: string | null;
hasStarted?: boolean;
}): Promise<ControllerDecision> {
const text = JSON.stringify(input.recentEvents);
if (!input.hasStarted) {
if (input.goal.includes('comfy')) {
return {
type: 'tool',
rationale: 'start remote comfy check',
calls: [{
tool: 'ssh-command',
input: {
host: 'yeestor@172.18.8.200',
command: 'bash -lc "source /home/yeestor/miniconda3/etc/profile.d/conda.sh && conda activate comfyui_qwen && cd /home/yeestor/comfyui-qwen-image/ComfyUI && timeout 20s python main.py --listen 127.0.0.1 --port 8188"',
},
}],
};
}
return {
type: 'tool',
rationale: 'start demo process',
calls: [{
tool: 'local-process',
input: {
command: 'python3',
args: ['-u', '-c', "import time; print('boot'); time.sleep(1); print('Traceback: config missing'); time.sleep(5); print('still retrying')"],
},
}],
};
}
if (input.activeRunId && (text.includes('Traceback:') || text.includes('still retrying'))) {
return { type: 'cancel_run', runId: input.activeRunId, rationale: 'root cause exposed early' };
}
if (text.includes('To see the GUI go to: http://127.0.0.1:8188')) {
return { type: 'finish', message: 'ComfyUI reached server start milestone' };
}
if (text.includes('"type":"tool.exit"')) {
return { type: 'finish', message: 'process exited' };
}
return { type: 'wait', rationale: 'still observing active run' };
}
}
import { EventLog } from './event-log';
import { EventBus } from './event-bus';
import { ProcessRuntime } from './process-runtime';
import { MiniAgent } from './agent';
const mode = Bun.argv[2] ?? 'demo';
const controllerMode = (Bun.argv[3] as 'fake' | 'qwen' | undefined) ?? 'fake';
const log = new EventLog(new URL('../runs/latest.jsonl', import.meta.url).pathname);
const bus = new EventBus(log);
const runtime = new ProcessRuntime(bus);
const agent = MiniAgent.withDefaultTools(bus, runtime, {
controller: controllerMode,
secretResolver: (ref: string) => process.env[ref.toUpperCase()] ?? process.env[ref] ?? undefined,
});
agent.attach();
if (mode === 'comfy-check') {
await agent.start('Use ssh-command exactly once with host=yeestor@172.18.8.200 and command=bash -lc "source /home/yeestor/miniconda3/etc/profile.d/conda.sh && conda activate comfyui_qwen && cd /home/yeestor/comfyui-qwen-image/ComfyUI && timeout 20s python main.py --listen 127.0.0.1 --port 8188". Do not use any other command. Wait during normal startup logs. If recent events include "To see the GUI go to: http://127.0.0.1:8188", return finish immediately. If a clear root cause appears, return cancel_run.');
} else {
await agent.start('Use local-process exactly once with command=python3 and args=["-u","-c","import time; print(\'boot\'); time.sleep(1); print(\'Traceback: config missing\'); time.sleep(60); print(\'still retrying\')"]. Do not use any other command. As soon as recent events include Traceback: config missing, immediately return cancel_run for the active run while it is still alive.');
}
await new Promise<void>((resolve) => {
const off = bus.on('agent.final', () => {
off();
setTimeout(resolve, 100);
});
});
import { BaseTool, type ToolCallContext, type ToolResult } from './tool';
export type LocalProcessInput = {
command: string;
args?: string[];
};
export class LocalProcessTool extends BaseTool<LocalProcessInput> {
name = 'local-process';
description = 'Describe a local process run request for ProcessRuntime. Input must use {command:string,args?:string[]} and command must be only the executable name, never a whole shell line.';
async execute(_ctx: ToolCallContext, input: LocalProcessInput): Promise<ToolResult> {
return {
success: true,
output: JSON.stringify(input),
outputSummary: `Prepared local process ${input.command}`,
};
}
}
{
"name": "mini-agent-runtime-ts",
"private": true,
"type": "module",
"scripts": {
"dev": "bun run src/index.ts",
"demo": "bun run src/index.ts demo",
"demo:qwen": "bun run src/index.ts demo qwen",
"comfy-check": "bun run src/index.ts comfy-check",
"comfy-check:qwen": "bun run src/index.ts comfy-check qwen"
}
}
import { randomUUID } from 'node:crypto';
import { spawn, type ChildProcessWithoutNullStreams } from 'node:child_process';
import { createInterface } from 'node:readline';
import { EventBus } from './event-bus';
import type { RunHandle } from './types';
const now = () => Date.now();
export class ProcessRuntime {
#bus: EventBus;
#runs = new Map<string, ChildProcessWithoutNullStreams>();
constructor(bus: EventBus) {
this.#bus = bus;
}
start(command: string, args: string[] = [], cwd?: string): RunHandle {
const runId = `run_${randomUUID().slice(0, 8)}`;
const proc = spawn(command, args, {
cwd,
stdio: ['ignore', 'pipe', 'pipe'],
env: process.env,
});
this.#runs.set(runId, proc);
this.#bus.publish({ type: 'tool.started', runId, ts: now(), command: [command, ...args].join(' ') });
const out = createInterface({ input: proc.stdout });
out.on('line', (line: string) => this.#bus.publish({ type: 'tool.stdout', runId, ts: now(), chunk: line }));
const err = createInterface({ input: proc.stderr });
err.on('line', (line: string) => this.#bus.publish({ type: 'tool.stderr', runId, ts: now(), chunk: line }));
proc.on('exit', (code: number | null, signalCode: NodeJS.Signals | null) => {
this.#runs.delete(runId);
this.#bus.publish({ type: 'tool.exit', runId, ts: now(), exitCode: code ?? -1, signalCode: signalCode ? Number.NaN : null });
});
return {
runId,
cancel: async () => {
const child = this.#runs.get(runId);
if (!child) return;
child.kill('SIGTERM');
this.#bus.publish({ type: 'tool.cancelled', runId, ts: now() });
},
};
}
}
import type { ControllerDecision } from './controller-types';
import type { ControllerRequest } from './controller-protocol';
export type QwenControllerOptions = {
baseUrl: string;
apiKeyRef: string;
model: string;
secretResolver?: (ref: string) => string | undefined;
};
function buildDecisionSchemaText() {
return [
'Available tool input schemas:',
'local-process => {"command":"executable only","args":["arg1","arg2"]}',
'ssh-command => {"host":"user@host","command":"remote shell command"}',
'NEVER put a whole shell command line into local-process.command.',
'Decision schema only:',
'{"type":"tool","rationale":"...","calls":[{"tool":"name","input":{...}}]}',
'{"type":"cancel_run","runId":"...","rationale":"..."}',
'{"type":"wait","rationale":"..."}',
'{"type":"finish","message":"..."}',
].join(' ');
}
export function buildQwenMessages(input: ControllerRequest) {
const system = [
'You are the controller of a long-running agent runtime.',
'Return strict JSON only.',
buildDecisionSchemaText(),
'Hard rule: if recent events already include a clear root cause such as Traceback:, ImportError:, ModuleNotFoundError, HTTP 429, or still retrying, and there is an active run, return cancel_run immediately in this same response. Do not return wait first.',
'Hard rule: if recent events include "To see the GUI go to: http://127.0.0.1:8188", return finish immediately.',
'If the run is only showing normal startup/progress logs, return wait.',
'Do not explain chain of thought.',
].join(' ');
return [
{ role: 'system', content: system },
{
role: 'user',
content: JSON.stringify({
goal: input.goal,
availableTools: input.availableTools,
recentEvents: input.recentEvents,
activeRunId: input.activeRunId ?? null,
hasStarted: input.hasStarted ?? false,
}),
},
];
}
export function buildFastPathMessages(input: ControllerRequest) {
const system = [
'You are a fast-path runtime controller.',
'Return strict JSON only.',
'Allowed outputs only:',
'{"type":"cancel_run","runId":"...","rationale":"..."}',
'{"type":"finish","message":"..."}',
'{"type":"wait","rationale":"..."}',
'If recent events contain any of: Traceback:, ImportError:, ModuleNotFoundError, HTTP 429, still retrying, and activeRunId is present, return cancel_run immediately.',
'If recent events contain: To see the GUI go to: http://127.0.0.1:8188 return finish immediately.',
'Otherwise return wait.',
'Be terse.',
].join(' ');
return [
{ role: 'system', content: system },
{
role: 'user',
content: JSON.stringify({
activeRunId: input.activeRunId ?? null,
recentEvents: input.recentEvents,
}),
},
];
}
export class QwenController {
#baseUrl: string;
#apiKeyRef: string;
#model: string;
#secretResolver?: (ref: string) => string | undefined;
constructor(options: QwenControllerOptions) {
this.#baseUrl = options.baseUrl.replace(/\/$/, '');
this.#apiKeyRef = options.apiKeyRef;
this.#model = options.model;
this.#secretResolver = options.secretResolver;
}
async #call(messages: Array<{ role: string; content: string }>): Promise<ControllerDecision> {
if (!this.#secretResolver) throw new Error('QwenController requires secretResolver');
const apiKey = this.#secretResolver(this.#apiKeyRef);
if (!apiKey) throw new Error(`Missing secret for ref: ${this.#apiKeyRef}`);
const response = await fetch(`${this.#baseUrl}/chat/completions`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
},
body: JSON.stringify({
model: this.#model,
messages,
response_format: { type: 'json_object' },
}),
});
if (!response.ok) throw new Error(`QwenController HTTP ${response.status}`);
const body = (await response.json()) as { choices?: Array<{ message?: { content?: string } }> };
const content = body.choices?.[0]?.message?.content;
if (!content) throw new Error('QwenController missing content');
return JSON.parse(content) as ControllerDecision;
}
async decide(input: ControllerRequest): Promise<ControllerDecision> {
return this.#call(buildQwenMessages(input));
}
async fastDecide(input: ControllerRequest): Promise<ControllerDecision> {
return this.#call(buildFastPathMessages(input));
}
}
export type RuntimeConfig = {
model: {
baseUrl: string;
apiKeyRef: string;
model: string;
};
};
export const defaultConfig: RuntimeConfig = {
model: {
baseUrl: 'https://dashscope.aliyuncs.com/compatible-mode/v1',
apiKeyRef: 'DASHSCOPE_API_KEY',
model: 'qwen3.6-plus',
},
};
import { BaseTool, type ToolCallContext, type ToolResult } from './tool';
import { buildSshCommand } from './tools';
export type SshCommandInput = {
host: string;
command: string;
};
export class SshCommandTool extends BaseTool<SshCommandInput> {
name = 'ssh-command';
description = 'Build an SSH command invocation for remote execution. Input must use {host:string,command:string}.';
async execute(_ctx: ToolCallContext, input: SshCommandInput): Promise<ToolResult> {
const spec = buildSshCommand(input.host, input.command);
return {
success: true,
output: JSON.stringify(spec),
outputSummary: `Built ssh command for ${input.host}`,
};
}
}
import type { Event } from './types';
export function summarizeRecentEvents(events: Event[], limit = 6) {
return events.slice(-limit).map((e) => {
if (e.type === 'tool.stdout' || e.type === 'tool.stderr') {
return { type: e.type, runId: e.runId, chunk: e.chunk };
}
if (e.type === 'tool.started') {
return { type: e.type, runId: e.runId, command: e.command };
}
if (e.type === 'tool.exit') {
return { type: e.type, runId: e.runId, exitCode: e.exitCode };
}
return e;
});
}
import { BaseTool } from './tool';
export class ToolRegistry {
#tools = new Map<string, BaseTool>();
register(tool: BaseTool) {
this.#tools.set(tool.name, tool);
}
get(name: string) {
return this.#tools.get(name);
}
list() {
return [...this.#tools.values()];
}
has(name: string) {
return this.#tools.has(name);
}
}
export type ToolCallContext = {
workDir?: string;
};
export type ToolResult = {
success: boolean;
output: string;
outputSummary?: string;
};
export abstract class BaseTool<TInput = unknown> {
abstract name: string;
abstract description: string;
abstract execute(ctx: ToolCallContext, input: TInput): Promise<ToolResult>;
}
export function buildSshCommand(host: string, remoteCommand: string) {
return {
command: 'ssh',
args: ['-o', 'StrictHostKeyChecking=no', host, remoteCommand],
};
}
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "Bundler",
"strict": true,
"skipLibCheck": true
}
}
export type Event =
| { type: 'tool.started'; runId: string; ts: number; command: string }
| { type: 'tool.stdout'; runId: string; ts: number; chunk: string }
| { type: 'tool.stderr'; runId: string; ts: number; chunk: string }
| { type: 'tool.exit'; runId: string; ts: number; exitCode: number; signalCode?: number | null }
| { type: 'tool.cancelled'; runId: string; ts: number }
| { type: 'agent.thought'; runId: 'agent'; ts: number; message: string }
| { type: 'agent.action'; runId: 'agent'; ts: number; action: string; detail?: string }
| { type: 'agent.final'; runId: 'agent'; ts: number; message: string };
export type RunHandle = {
runId: string;
cancel: () => Promise<void>;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment