Skip to content

Instantly share code, notes, and snippets.

@1stvamp
Created June 15, 2026 00:17
Show Gist options
  • Select an option

  • Save 1stvamp/1a1350a2ec1fc854c96d9780afd4b6a4 to your computer and use it in GitHub Desktop.

Select an option

Save 1stvamp/1a1350a2ec1fc854c96d9780afd4b6a4 to your computer and use it in GitHub Desktop.
Trigger.dev issue #1 execution snapshot POC: benchmark plan, Phase 0/1 results, and harness scripts (phase0 CAS+HOT/bloat, phase1 S3/MinIO/SeaweedFS, soakD long-soak)

POC benchmark plan: scalable execution snapshots

Goal: de-risk the head-row + S3-log design with the smallest set of experiments that could actually change it, and decide the schema-growth strategy from data rather than assumption.

Ground rules

  • Two kinds of result, never mixed. Shape / correctness / relative results (HOT-update ratio, bloat trend, CAS correctness, MinIO-vs-SeaweedFS comparison) are valid in containers on modest hardware. Absolute throughput/latency needs a production-representative instance and same-region S3; a laptop number there is fiction. Every chart is labelled which it is.
  • Earn the complexity. Start from the simplest viable schema (one unpartitioned table, delete-on-terminal, single primary). Partitioning, forward-migration, and sharding are each gated behind a specific metric that fails without them. If the simple thing holds at target scale, we ship the simple thing.
  • Representative workload. Synthesize the real shape: per-run transition counts of 5 to 20+, a create/transition/complete mix derived from ~1B runs/month, and a "sleeper" cohort (a few percent of runs left open for the whole window) to model the wait()-for-weeks runs.
  • Reproducible. Pinned versions, scripted harness, fixed seeds, warmup then a steady-state measurement window, report p50/p99/p999 (not means), repeat trials, capture raw data.

Workstream A: Postgres head row + growth management

What we're proving: the head row stays small, updates stay HOT, and growth is manageable with the least machinery.

A0: Is the simplest schema enough? (cheapest, highest-information)

  • Setup: single run_head table, PK on run_id, no partitioning, delete-on-terminal. Tune fillfactor. Postgres pinned (e.g. 16).
  • Workload: drive create (INSERT) / transition (CAS UPDATE) / complete (DELETE) at steady state for a multi-hour window, at active-run counts of 1e5, 1e6, 1e7. Hold a sleeper cohort open.
  • Measure: HOT-update ratio (n_tup_hot_upd vs n_tup_upd), table + index bloat over time (pgstattuple), autovacuum keeping up (dead tuples, autovacuum runs/duration), per-op p50/p99, steady-state table/index size vs active-run count.
  • Pass: HOT ratio near 100%, bloat flat, size tracks the active set not cumulative history. If it passes, partitioning is unnecessary and we stop here (simplest wins). If autovacuum can't keep up or bloat climbs, escalate to A1.
  • Variables: active-run count, fillfactor (70/80/90/100, to show HOT sensitivity), sleeper %.

A1: Partition-drop retention (only if A0 fails)

  • Setup: range-partition run_head by created_at; retention via DETACH PARTITION CONCURRENTLY + DROP.
  • Compare: partition-drop reclamation vs bulk DELETE + autovacuum (reclaim time, lock held, bloat after, insert p99 during retention).
  • Sleeper handling: forward-migrate the open rows out of the oldest partition, then drop it; measure how many rows move and the cost.
  • Pass: drop is sub-second and lock-light; insert p99 stays flat during retention; no vacuum storm. This is the "real solution for table growth" only if A0 says we need it.

A2: CAS correctness (correctness, not perf)

  • Setup: N concurrent writers racing the same run through the head-row CAS.
  • Assert: exactly one writer wins each step, seq is gap-free and never forks, even with induced redlock expiry (kill/expire the lock mid-transition) to prove the CAS is a genuine backstop, not redundant with the lock.

Tooling: custom load generator or pgbench custom scripts; pg_stat_user_tables, pgstattuple, pg_stat_activity, auto_explain.


Workstream B: S3 / object-storage write and read paths

What we're proving: the body-log write path scales, the batching tradeoff is real, and the cold read path is acceptable.

  • B1 (write latency/throughput): PUT small objects (1/4/16/64 KB) across many run prefixes at increasing concurrency. Measure PUT p50/p99/p999, throughput, and 503/SlowDown throttle rate.
  • B2 (batching tradeoff): vary batch factor (1, 10, 20, 50 snapshots/object). Measure PUT request count, extrapolated monthly cost, and the buffering latency added to the write path.
  • B3 (cold reads): "all states" via prefix ListObjectsV2, and "since N" via start-after, at per-run object counts of 10/100/1000/5000. Measure LIST pages, total latency, and GET fan-out. This locates where the cold path would get expensive for very long runs.
  • B4 (consistency): write-then-immediately list/get to confirm read-after-write and list-after-write on each backend.
  • Variables: object size, concurrency, prefix fan-out, batch factor, per-run object count.

Workstream C: S3 vs MinIO vs SeaweedFS

What we're proving: which self-hostable backend fits this workload (many small objects, high write rate, prefix listing), plus self-host correctness.

  • Setup: run the B1/B3/B4 harness against AWS S3 (managed baseline, same-region if possible), MinIO (container), and SeaweedFS (container). Equalize hardware for the two self-hosted backends; treat S3 as a baseline for behaviour, not an absolute comparison against laptop containers.
  • Hypothesis to test: SeaweedFS, built for large numbers of small files, may beat MinIO on small-object PUT throughput and on LIST latency at high object counts. Verify rather than assume. Note MinIO CE's maintenance-only status as an operational risk regardless of the numbers.
  • Capture: small-object PUT throughput, LIST latency as object count grows, read-after-write and list-after-write correctness, and operational notes (deploy complexity, memory, consistency caveats).
  • Caveat: container-on-one-box numbers are relative only; absolute figures need representative nodes and disks.

Workstream D: Postgres long-soak (the real partitioning decider)

The short A0 runs proved the steady-state shape (HOT-heavy, heap flat, autovacuum sawtooth). They do not prove the long-horizon behaviours that actually decide whether the simple schema survives at Trigger's scale. This is the test that settles the partitioning-vs-simple call I deferred.

What it must run: hours-to-days of sustained churn at a representative per-table transition rate and active-set size (1M-10M). Decouple from the laptop's fsync ceiling (use synchronous_commit=off or batched commits, plus higher concurrency) so the rate is representative even if the hardware isn't, bloat/vacuum dynamics extrapolate on rate-vs-vacuum-throughput, not total data.

Hypotheses / what to watch over time:

  • Autovacuum keeps up. Dead-tuple % stays in a bounded band over hours, doesn't slowly creep. Sweep autovacuum_vacuum_scale_factor, cost limits, naptime, and per-table settings.
  • PK-index bloat trajectory. The one warning sign from A0 (0.4→0.9 MB in 120s). Does it plateau or grow until REINDEX CONCURRENTLY is needed, and at what cadence? Heap reuse is proven; the btree is the open question.
  • Freeze / wraparound. On a table taking billions of writes/month, characterise autovacuum freeze behaviour and autovacuum_freeze_max_age.
  • Sleeper xmin holdback. Confirm a weeks-open sleeper run does not pin dead tuples (each transition is its own short transaction, so it shouldn't, but prove it; also test a genuinely long-open transaction elsewhere blocking vacuum).
  • fillfactor sweep at the long horizon, since the HOT ratio interacts with how fast pages fill between vacuums.

Decision: if dead% and index size plateau under representative rate, the simple unpartitioned schema is vindicated for real. If autovacuum falls behind or index bloat forces frequent reindexing, partitioning (A1) earns its complexity back because a partition drop reclaims the index for free.

Phasing and decision gates

  1. Phase 0: A0 + A2. Cheapest and most decisive: does the simple schema hold, and is the CAS correct? Outcome gates whether we ever build partitioning.
  2. Phase 1: B1-B4 against S3, plus C (MinIO/SeaweedFS comparison). Characterizes the body path and picks the self-host backend.
  3. Phase 2: A1, only if Phase 0 shows a growth/bloat problem. Justifies partitioning with data.
  4. Phase 3: absolute numbers on a production-representative Postgres instance and same-region S3, replaying real transition traces. Confirms headroom and the shard threshold.

Each phase ends with a one-page report: the metric tables, the decision (does this change the design?), and a go/no-go.

What each result could change

  • A0 flat and HOT-heavy → drop partitioning from the design entirely (simpler).
  • A0 bloat/vacuum problems → partitioning is justified; A1 picks drop-vs-delete and sleeper handling.
  • B2 shows batching latency is intolerable → bodies stay one-per-snapshot and we lean on Express/cheaper backend instead.
  • B3 shows cold "since N" is slow for long runs → consider a small per-run index or capping cold scans.
  • C shows SeaweedFS clearly wins for small objects → it becomes the recommended self-host default over MinIO.

Rough effort

Phase 0: ~1 to 2 days (harness + runs + report). Phase 1 + C: ~2 to 3 days. Phase 2: ~1 day if triggered. Phase 3: depends on provisioning a representative environment.

Phase 0 results: Postgres head-row (A0 + A2)

Harness: /tmp/snapshot-poc/phase0.mjs driving Postgres 16 (container) via pg, 8 workers, sharded active set (A0) / single-row contention (A2).

Validity caveat (per the plan's ground rules): this is a local container. The shape results (HOT-update ratio, bloat trend, size-tracks-active, CAS correctness) are valid here. The throughput numbers are not: ~900 updates/sec is fsync-bound on the container disk because every statement auto-commits (one WAL flush each), and p50 ~7 ms per update is that fsync, not CPU. Absolute throughput is a Phase 3 question on production-representative hardware.

A2: CAS correctness (no lock at all)

16 writers racing a single run for 5,000 transitions:

metric result
final seq 5,000 (= target)
distinct winners 5,000
duplicate seqs 0 (no forked transition)
gaps in 1..5000 0 (no lost transition)
conflicts / retries 42,363
verdict PASS, the head-row CAS serialises transitions with no lock

The high conflict count is an artifact of all 16 writers hammering one run (worst case). Real transitions spread across runs, so conflicts are rare. The point stands: the compare-and-swap alone is sufficient for correctness, which is why it's a real backstop when the redlock expires.

A0: simplest schema (single table, delete-on-terminal, no partitioning)

scenario HOT ratio heap (start → end) index (start → end) dead% range autovacuums size/row
100k active, ff90, 60s 96.85% 9.5 → 9.7 MB (flat) 3.9 MB (flat) 0.5-1.6% 1 ~136 B
1M active, ff90, 60s 90.97% 94.2 → 94.7 MB (flat) 39.0 MB (flat) 0.4% 0 ~134 B
10k active, ff90, churn 120s 99.46% 1.0 → 1.4 MB (flat) 0.4 → 0.9 MB 2-8% sawtooth 10 ~140 B
100k active, ff70, 60s 98.32% 12.4 MB (flat) 3.9 MB (flat) 0.4-1.5% 1 ~164 B

The churn run is the important one: 83k updates + 14.8k deletes + 14.8k inserts over 120s, and the heap never grew past 1.4 MB (10k active rows). Autovacuum reclaimed deleted-row space every ~10s (the dead% sawtooth) and the space was reused. That is delete-on-terminal keeping the table bounded by the active set, empirically.

Findings

  1. Heap is bounded by active runs, not history. Flat under sustained churn at 10k, and flat (no growth) under pure update churn at 100k and 1M. Size is ~135-165 bytes/active-row including the PK index.
  2. HOT-update ratio is high and tunable: 91-97% out of the box, 98-99% with a lower fillfactor (more page headroom) and/or more frequent autovacuum. The mutated columns aren't indexed, so updates are HOT-eligible by design; the misses are just pages running out of free space before autovacuum catches up.
  3. Autovacuum keeps up on the churn workload (10 runs in 120s with autovacuum_naptime=10s), holding dead tuples in a bounded 2-8% band.
  4. One thing to watch: PK-index bloat under churn. The btree crept 0.4 → 0.9 MB over 120s of heavy delete/insert (heap stayed flat). Btree reclaims less eagerly than the heap, so a long-running, high-churn deployment wants either periodic REINDEX CONCURRENTLY or monitoring. This is the main item for a multi-hour Phase 3 run to characterise the steady state.

Decision

The simplest schema holds at these scales, so partitioning (A1) is not needed in the baseline. The "avoid complexity" gate fires in favour of: one unpartitioned run_head table, delete-on-terminal, plain autovacuum. Partitioning stays a documented contingency for if a multi-hour run or much higher churn shows autovacuum or index bloat losing the race; it is not part of the v1 design.

What still needs Phase 3 (representative hardware)

  • Absolute throughput and latency (this run is fsync-bound and not representative).
  • Multi-hour steady state, specifically the PK-index bloat trend and whether periodic reindex is required.
  • 10M+ active rows, to confirm the size/throughput story holds at the top of the range.

Reproduce

docker run -d --name snap-poc-pg -e POSTGRES_PASSWORD=poc -e POSTGRES_DB=poc -p 55432:5432 \
  postgres:16 -c autovacuum_naptime=10s -c log_autovacuum_min_duration=0
export NODE_PATH=<repo>/node_modules/.pnpm/pg@8.16.3/node_modules
MODE=a2 CONCURRENCY=16 T_TARGET=5000 node /tmp/snapshot-poc/phase0.mjs
MODE=a0 ACTIVE=10000 TMIN=3 TMAX=6 DURATION_S=120 node /tmp/snapshot-poc/phase0.mjs

Phase 1 + Workstream C results: object storage

Harness: /tmp/snapshot-poc/phase1.mjs (AWS S3 SDK v3, works against MinIO / SeaweedFS / real S3). Local containers: MinIO on :9900, SeaweedFS S3 on :8333. Defaults: 4 KB objects, concurrency 32, 1,000-state run for cold reads, 20:1 batch factor.

Validity caveat: single-box containers. Correctness and the batching/read shape are valid here. Absolute PUT/GET latency and throughput are not, and were visibly noisy (MinIO PUT/s read 750, 590, then 167 across runs; SeaweedFS 622, 542, 388). The MinIO-vs-SeaweedFS ordering flipped between runs, so it is not decisive here; that comparison needs a controlled, repeated run on representative hardware (real S3 via the AWS credits).

What's solid

check MinIO SeaweedFS why it matters
read-after-write OK OK the hot path doesn't depend on it, but cold reads do
list-after-write OK OK "all states" / "since N" rely on strongly-consistent listing; both self-host backends provided it
batching 20:1 20x fewer PUTs, ~16x faster wall 20x fewer PUTs, ~19x faster wall the PUT-cost lever, confirmed
cold "all states" (1,000-state run) 1 LIST page, ~240 ms 1 LIST page, ~170 ms prefix listing is cheap at this size
cold "since N/2" (StartAfter) 499 keys, ~113 ms 499 keys, ~97 ms the cursor read returns only the tail
GET fan-out of 1,000 objects ~2.7 s (~376 GET/s) ~2.3 s (~433 GET/s) this is the cold-read cost, hence debug-only

Findings

  1. Self-host correctness holds. Both MinIO and SeaweedFS gave read-after-write and list-after-write consistency, which is what the "all states / since N" reads and the body round-trip depend on. (Real S3 has had strong read-after-write since 2020.)
  2. Batching delivers ~linearly. 20:1 packing cut PUT request count 20x and wall time ~16-19x. This is the lever that turns the ~$35k/mo unbatched S3 PUT cost into <$2k.
  3. The cold read model works. "All states" for a 1,000-state run is a single LIST page; "since N" via StartAfter returns only the tail. The expense is the GET fan-out (hundreds of GET/s for thousands of small objects), confirming cold reads belong off the hot path. For very long runs (thousands of states) the fan-out is where you'd revisit (batch the bodies, or cap cold scans).
  4. Backend choice is undecided by this run. Both are correct and in the same ballpark; the throughput ordering is within local noise. Deciding MinIO vs SeaweedFS vs S3 needs the representative run.

Next: representative S3 run (uses the AWS credits)

Run the same harness against real S3 from a machine in the same region (ideally an EC2 instance in-VPC for representative latency). Credentials stay in your shell, never pasted anywhere:

cd /tmp/snapshot-poc            # has a clean SDK install
export AWS_REGION=us-east-1
BACKEND=s3 ENDPOINT=https://s3.us-east-1.amazonaws.com \
  ACCESS_KEY=... SECRET_KEY=... BUCKET=your-poc-bucket \
  OBJ_SIZE=4096 N_OBJECTS=20000 CONCURRENCY=64 READ_PER_RUN=5000 \
  node phase1.mjs

That gives same-region PUT/GET/LIST latency distributions, the real batching cost curve, and 503/SlowDown behaviour under prefix fan-out, the absolute numbers the local run deliberately doesn't claim.

// Phase 0 POC harness for the run_head design.
// A0: simplest schema (single table, delete-on-terminal, no partitioning).
// Measures HOT-update ratio, bloat trend, size-vs-active, op latencies.
// A2: CAS correctness under raw concurrent contention (no lock at all),
// proving the head-row compare-and-swap serialises transitions by itself.
//
// Local-container run: throughput numbers are RELATIVE/shape only, not absolute.
// HOT ratio, bloat trend, size-tracks-active, and CAS correctness are valid here.
import { createRequire } from "module";
const require = createRequire(import.meta.url);
const { Pool } = require("/home/wes/projects/trigger/tech-test-wes-mason/node_modules/.pnpm/pg@8.16.3/node_modules/pg");
const CONN = process.env.PG_URL || "postgres://postgres:poc@localhost:55432/poc";
const MODE = process.env.MODE || "a0";
const ACTIVE = parseInt(process.env.ACTIVE || "100000", 10);
const DURATION_S = parseInt(process.env.DURATION_S || "60", 10);
const CONCURRENCY = parseInt(process.env.CONCURRENCY || "8", 10);
const FILLFACTOR = parseInt(process.env.FILLFACTOR || "90", 10);
const SLEEPER_PCT = parseFloat(process.env.SLEEPER_PCT || "0.02");
const T_TARGET = parseInt(process.env.T_TARGET || "5000", 10); // a2 rounds
const TMIN = parseInt(process.env.TMIN || "5", 10); // transitions per run before complete
const TMAX = parseInt(process.env.TMAX || "20", 10);
const randInt = (lo, hi) => lo + Math.floor(Math.random() * (hi - lo + 1));
const pct = (arr, p) => {
if (arr.length === 0) return 0;
const s = arr.slice().sort((a, b) => a - b);
return s[Math.min(s.length - 1, Math.floor((p / 100) * s.length))];
};
const fmt = (n) => n.toLocaleString("en-US");
async function setup(pool) {
await pool.query("DROP TABLE IF EXISTS run_head");
await pool.query(`CREATE TABLE run_head (
run_id text PRIMARY KEY,
seq bigint NOT NULL DEFAULT 0,
execution_status text NOT NULL,
run_status text NOT NULL,
attempt_number int,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
) WITH (fillfactor = ${FILLFACTOR})`);
await pool.query("CREATE EXTENSION IF NOT EXISTS pgstattuple");
}
async function statTable(pool) {
const s = await pool.query(
`SELECT n_tup_ins, n_tup_upd, n_tup_hot_upd, n_tup_del, n_live_tup, n_dead_tup,
autovacuum_count, vacuum_count
FROM pg_stat_user_tables WHERE relname = 'run_head'`
);
return s.rows[0] || {};
}
async function sizeSample(pool) {
const sz = await pool.query(
`SELECT pg_table_size('run_head') AS heap,
pg_indexes_size('run_head') AS idx,
pg_total_relation_size('run_head') AS total`
);
const st = await pool.query(
`SELECT tuple_count, dead_tuple_count, dead_tuple_percent, free_percent
FROM pgstattuple('run_head')`
);
return { ...sz.rows[0], ...st.rows[0] };
}
async function preload(pool, n) {
const t0 = Date.now();
const BATCH = 50000;
for (let off = 0; off < n; off += BATCH) {
const lim = Math.min(BATCH, n - off);
await pool.query(
`INSERT INTO run_head (run_id, seq, execution_status, run_status)
SELECT 'r' || g, 0, 'RUN_CREATED', 'PENDING'
FROM generate_series($1::bigint, $2::bigint) g`,
[off + 1, off + lim]
);
}
return Date.now() - t0;
}
async function runA0() {
const pool = new Pool({ connectionString: CONN, max: CONCURRENCY + 2 });
await setup(pool);
console.log(`[A0] active=${fmt(ACTIVE)} concurrency=${CONCURRENCY} fillfactor=${FILLFACTOR} duration=${DURATION_S}s sleepers=${(SLEEPER_PCT * 100).toFixed(0)}%`);
const loadMs = await preload(pool, ACTIVE);
console.log(`[A0] preloaded ${fmt(ACTIVE)} rows in ${loadMs}ms`);
// in-memory active set, sharded across workers; a fraction parked (sleepers)
const entries = new Array(ACTIVE);
for (let i = 0; i < ACTIVE; i++) {
const parked = Math.random() < SLEEPER_PCT;
entries[i] = { id: "r" + (i + 1), seq: 0, target: parked ? Infinity : randInt(TMIN, TMAX), parked };
}
const statBefore = await statTable(pool);
const samples = [];
const lat = { upd: [], ins: [], del: [] };
let ops = { upd: 0, ins: 0, del: 0 };
const deadline = Date.now() + DURATION_S * 1000;
const sampler = setInterval(async () => {
try { samples.push({ t: Math.round((Date.now() - (deadline - DURATION_S * 1000)) / 1000), ...(await sizeSample(pool)) }); } catch {}
}, 5000);
async function worker(wid) {
const client = await pool.connect();
let ptr = wid;
try {
while (Date.now() < deadline) {
// round-robin across this worker's shard (stride = CONCURRENCY)
if (ptr >= ACTIVE) ptr = wid;
const e = entries[ptr];
ptr += CONCURRENCY;
if (!e || e.parked) continue;
let s = process.hrtime.bigint();
const r = await client.query(
`UPDATE run_head SET seq = seq + 1, execution_status = 'EXECUTING',
run_status = 'EXECUTING', attempt_number = 1, updated_at = now()
WHERE run_id = $1 AND seq = $2 RETURNING seq`,
[e.id, e.seq]
);
lat.upd.push(Number(process.hrtime.bigint() - s) / 1e6);
ops.upd++;
if (r.rowCount === 1) e.seq = Number(r.rows[0].seq);
if (e.seq >= e.target) {
s = process.hrtime.bigint();
await client.query(`DELETE FROM run_head WHERE run_id = $1`, [e.id]);
lat.del.push(Number(process.hrtime.bigint() - s) / 1e6);
ops.del++;
const nid = `w${wid}-${ops.ins}`;
s = process.hrtime.bigint();
await client.query(
`INSERT INTO run_head (run_id, seq, execution_status, run_status)
VALUES ($1, 0, 'RUN_CREATED', 'PENDING')`,
[nid]
);
lat.ins.push(Number(process.hrtime.bigint() - s) / 1e6);
ops.ins++;
entries[ptr - CONCURRENCY] = { id: nid, seq: 0, target: randInt(TMIN, TMAX), parked: false };
}
}
} finally {
client.release();
}
}
const t0 = Date.now();
await Promise.all(Array.from({ length: CONCURRENCY }, (_, w) => worker(w)));
const elapsed = (Date.now() - t0) / 1000;
clearInterval(sampler);
const statAfter = await statTable(pool);
const finalSize = await sizeSample(pool);
const d = (k) => Number(statAfter[k] || 0) - Number(statBefore[k] || 0);
const upd = d("n_tup_upd"), hot = d("n_tup_hot_upd");
const totalOps = ops.upd + ops.ins + ops.del;
console.log(`\n===== A0 RESULTS (active=${fmt(ACTIVE)}) =====`);
console.log(`elapsed: ${elapsed.toFixed(1)}s`);
console.log(`ops total: ${fmt(totalOps)} (${fmt(Math.round(totalOps / elapsed))}/s)`);
console.log(` updates: ${fmt(ops.upd)} (${fmt(Math.round(ops.upd / elapsed))}/s)`);
console.log(` inserts: ${fmt(ops.ins)}`);
console.log(` deletes: ${fmt(ops.del)}`);
console.log(`HOT-update ratio: ${upd > 0 ? ((hot / upd) * 100).toFixed(2) : "n/a"}% (hot ${fmt(hot)} / upd ${fmt(upd)}) <-- want ~100%`);
console.log(`update latency ms: p50 ${pct(lat.upd, 50).toFixed(2)} p99 ${pct(lat.upd, 99).toFixed(2)} p999 ${pct(lat.upd, 99.9).toFixed(2)}`);
console.log(`insert latency ms: p50 ${pct(lat.ins, 50).toFixed(2)} p99 ${pct(lat.ins, 99).toFixed(2)}`);
console.log(`delete latency ms: p50 ${pct(lat.del, 50).toFixed(2)} p99 ${pct(lat.del, 99).toFixed(2)}`);
console.log(`final live tuples: ${fmt(Number(statAfter.n_live_tup || 0))} dead tuples: ${fmt(Number(statAfter.n_dead_tup || 0))}`);
console.log(`autovacuum runs: ${d("autovacuum_count")} (during test)`);
console.log(`heap size: ${(finalSize.heap / 1e6).toFixed(1)} MB index: ${(finalSize.idx / 1e6).toFixed(1)} MB total: ${(finalSize.total / 1e6).toFixed(1)} MB`);
console.log(`pgstattuple: live tuples ${fmt(Number(finalSize.tuple_count))} dead% ${Number(finalSize.dead_tuple_percent).toFixed(2)} free% ${Number(finalSize.free_percent).toFixed(2)}`);
console.log(`\nsize / bloat over time (heapMB, idxMB, dead%, free%):`);
for (const s of samples) {
console.log(` t=${String(s.t).padStart(3)}s heap ${(s.heap / 1e6).toFixed(1)} idx ${(s.idx / 1e6).toFixed(1)} dead% ${Number(s.dead_tuple_percent).toFixed(2)} free% ${Number(s.free_percent).toFixed(2)}`);
}
await pool.end();
}
async function runA2() {
const pool = new Pool({ connectionString: CONN, max: CONCURRENCY + 2 });
await setup(pool);
await pool.query(`INSERT INTO run_head (run_id, seq, execution_status, run_status) VALUES ('cas', 0, 'RUN_CREATED', 'PENDING')`);
console.log(`[A2] CAS correctness: ${CONCURRENCY} concurrent writers racing one run, target ${fmt(T_TARGET)} transitions, NO lock`);
const winners = new Map(); // seq -> count, to detect dupes
let conflicts = 0;
let done = false;
async function writer() {
const client = await pool.connect();
try {
while (!done) {
const cur = await client.query(`SELECT seq FROM run_head WHERE run_id = 'cas'`);
const seq = Number(cur.rows[0].seq);
if (seq >= T_TARGET) { done = true; break; }
const r = await client.query(
`UPDATE run_head SET seq = seq + 1, updated_at = now()
WHERE run_id = 'cas' AND seq = $1 RETURNING seq`,
[seq]
);
if (r.rowCount === 1) {
const ns = Number(r.rows[0].seq);
winners.set(ns, (winners.get(ns) || 0) + 1);
if (ns >= T_TARGET) done = true;
} else {
conflicts++; // lost the race, retry
}
}
} finally {
client.release();
}
}
const t0 = Date.now();
await Promise.all(Array.from({ length: CONCURRENCY }, () => writer()));
const elapsed = (Date.now() - t0) / 1000;
const finalSeq = Number((await pool.query(`SELECT seq FROM run_head WHERE run_id = 'cas'`)).rows[0].seq);
let dupes = 0, gaps = 0;
for (const [, c] of winners) if (c > 1) dupes++;
for (let i = 1; i <= T_TARGET; i++) if (!winners.has(i)) gaps++;
console.log(`\n===== A2 RESULTS =====`);
console.log(`final seq: ${fmt(finalSeq)} (target ${fmt(T_TARGET)})`);
console.log(`distinct winners: ${fmt(winners.size)}`);
console.log(`duplicate seqs: ${dupes} <-- MUST be 0 (no forked transition)`);
console.log(`gaps in 1..target: ${gaps} <-- MUST be 0 (no lost transition)`);
console.log(`conflicts (retries): ${fmt(conflicts)} (the CAS doing real work under contention)`);
console.log(`result: ${finalSeq === T_TARGET && dupes === 0 && gaps === 0 ? "PASS - CAS serialises transitions with no lock" : "FAIL"}`);
console.log(`elapsed: ${elapsed.toFixed(1)}s`);
await pool.end();
}
(MODE === "a2" ? runA2() : runA0()).catch((e) => { console.error(e); process.exit(1); });
// Phase 1 + Workstream C: object-storage harness for the snapshot body log.
// Works against any S3-compatible endpoint (MinIO, SeaweedFS, real AWS S3).
// B1 write latency/throughput for small objects across many run prefixes
// B2 batching tradeoff (objects-per-PUT) -> request count + wall time
// B3 cold reads: prefix LIST ("all states") and StartAfter ("since N") + GET fan-out
// B4 read-after-write and list-after-write consistency
//
// Local MinIO/SeaweedFS numbers are RELATIVE/shape + correctness only.
// Absolute latency/throughput needs real S3 (run this same harness with S3 env).
import { createRequire } from "module";
const require = createRequire(import.meta.url);
const SDK = process.env.SDK_PATH || "@aws-sdk/client-s3";
const { S3Client, PutObjectCommand, GetObjectCommand, ListObjectsV2Command, CreateBucketCommand, HeadBucketCommand } = require(SDK);
const BACKEND = process.env.BACKEND || "minio";
const presets = {
minio: { endpoint: "http://localhost:9900", ak: "minioadmin", sk: "minioadmin" },
seaweed: { endpoint: "http://localhost:8333", ak: "any", sk: "anysecret12345" },
s3: { endpoint: process.env.ENDPOINT, ak: process.env.ACCESS_KEY, sk: process.env.SECRET_KEY },
};
const cfg = presets[BACKEND];
const REGION = process.env.REGION || "us-east-1";
const BUCKET = process.env.BUCKET || "snap-poc";
const OBJ_SIZE = parseInt(process.env.OBJ_SIZE || "4096", 10);
const N_OBJECTS = parseInt(process.env.N_OBJECTS || "3000", 10);
const CONCURRENCY = parseInt(process.env.CONCURRENCY || "32", 10);
const PREFIXES = parseInt(process.env.PREFIXES || "300", 10);
const READ_PER_RUN = parseInt(process.env.READ_PER_RUN || "1000", 10);
const BATCH = parseInt(process.env.BATCH || "20", 10);
const fmt = (n) => Math.round(n).toLocaleString("en-US");
const pctl = (arr, p) => { if (!arr.length) return 0; const s = arr.slice().sort((a, b) => a - b); return s[Math.min(s.length - 1, Math.floor((p / 100) * s.length))]; };
const pad = (n) => String(n).padStart(8, "0");
const body = Buffer.alloc(OBJ_SIZE, 7);
const s3 = new S3Client({
endpoint: cfg.endpoint, region: REGION, forcePathStyle: true,
credentials: { accessKeyId: cfg.ak, secretAccessKey: cfg.sk },
});
async function runPool(items, concurrency, fn) {
let i = 0; const lat = [];
async function worker() {
while (i < items.length) {
const it = items[i++];
const s = process.hrtime.bigint();
await fn(it);
lat.push(Number(process.hrtime.bigint() - s) / 1e6);
}
}
await Promise.all(Array.from({ length: concurrency }, () => worker()));
return lat;
}
async function ensureBucket() {
try { await s3.send(new HeadBucketCommand({ Bucket: BUCKET })); }
catch { try { await s3.send(new CreateBucketCommand({ Bucket: BUCKET })); } catch (e) { if (!/BucketAlreadyOwnedByYou|BucketAlreadyExists/.test(String(e))) throw e; } }
}
const put = (Key, Body) => s3.send(new PutObjectCommand({ Bucket: BUCKET, Key, Body }));
async function get(Key) { const r = await s3.send(new GetObjectCommand({ Bucket: BUCKET, Key })); return (await r.Body.transformToByteArray()).length; }
async function listAll(prefix, startAfter) {
let token, pages = 0, keys = 0; const out = [];
do {
// EncodingType url avoids list-XML deserialization breaking on special chars
const r = await s3.send(new ListObjectsV2Command({ Bucket: BUCKET, Prefix: prefix, EncodingType: "url", ContinuationToken: token, StartAfter: token ? undefined : startAfter }));
pages++; keys += r.KeyCount || 0;
for (const o of r.Contents || []) out.push(decodeURIComponent(o.Key));
token = r.IsTruncated ? r.NextContinuationToken : undefined;
} while (token);
return { pages, keys, out };
}
async function main() {
console.log(`\n######## BACKEND=${BACKEND} (${cfg.endpoint}) bucket=${BUCKET} objSize=${OBJ_SIZE}B concurrency=${CONCURRENCY} ########`);
await ensureBucket();
// B4 consistency
const probeKey = `cc/${Date.now()}/probe`;
await put(probeKey, body);
let raw = "n/a", law = "n/a";
try { raw = (await get(probeKey)) === OBJ_SIZE ? "OK" : "MISMATCH"; } catch { raw = "FAIL"; }
try { const l = await listAll(`cc/`); law = l.out.includes(probeKey) ? "OK" : "MISSING"; } catch { law = "FAIL"; }
console.log(`B4 read-after-write: ${raw} list-after-write: ${law}`);
// B1 write
const keys = Array.from({ length: N_OBJECTS }, (_, i) => `runs/run${i % PREFIXES}/${pad(Math.floor(i / PREFIXES))}`);
const t0 = Date.now();
const wlat = await runPool(keys, CONCURRENCY, (k) => put(k, body));
const welapsed = (Date.now() - t0) / 1000;
console.log(`B1 write: ${fmt(N_OBJECTS)} objs in ${welapsed.toFixed(1)}s = ${fmt(N_OBJECTS / welapsed)} PUT/s`);
console.log(` PUT latency ms: p50 ${pctl(wlat, 50).toFixed(1)} p99 ${pctl(wlat, 99).toFixed(1)} p999 ${pctl(wlat, 99.9).toFixed(1)}`);
// B2 batching: same logical snapshots, one-per-object vs BATCH-per-object
const G = Math.min(2000, N_OBJECTS);
const unbatchedKeys = Array.from({ length: G }, (_, i) => `b2/unbatched/${pad(i)}`);
let s = Date.now();
await runPool(unbatchedKeys, CONCURRENCY, (k) => put(k, body));
const ub = (Date.now() - s) / 1000;
const batchedKeys = Array.from({ length: Math.ceil(G / BATCH) }, (_, i) => `b2/batched/${pad(i)}`);
const bigBody = Buffer.alloc(OBJ_SIZE * BATCH, 7);
s = Date.now();
await runPool(batchedKeys, CONCURRENCY, (k) => put(k, bigBody));
const bt = (Date.now() - s) / 1000;
console.log(`B2 batching (${fmt(G)} snapshots): unbatched ${fmt(G)} PUTs in ${ub.toFixed(1)}s vs batched ${fmt(batchedKeys.length)} PUTs in ${bt.toFixed(1)}s (${BATCH}:1 -> ${(G / batchedKeys.length).toFixed(0)}x fewer requests, ${(ub / bt).toFixed(1)}x faster wall)`);
// B3 cold reads: preload one run prefix with READ_PER_RUN objects
const rprefix = `read-run/`;
await runPool(Array.from({ length: READ_PER_RUN }, (_, i) => `${rprefix}${pad(i)}`), CONCURRENCY, (k) => put(k, body));
s = Date.now(); const all = await listAll(rprefix); const listAllMs = Date.now() - s;
const sinceFrom = `${rprefix}${pad(Math.floor(READ_PER_RUN / 2))}`;
s = Date.now(); const since = await listAll(rprefix, sinceFrom); const sinceMs = Date.now() - s;
s = Date.now(); await runPool(all.out, CONCURRENCY, (k) => get(k)); const getMs = Date.now() - s;
console.log(`B3 cold reads (run with ${fmt(READ_PER_RUN)} states):`);
console.log(` all states: LIST ${all.keys} keys in ${all.pages} page(s), ${listAllMs}ms | since N/2: ${since.keys} keys in ${sinceMs}ms | GET fan-out of all: ${getMs}ms (${fmt(all.out.length / (getMs / 1000))} GET/s)`);
}
main().catch((e) => { console.error("ERROR", e?.message || e); process.exit(1); });
// Workstream D: Postgres long-soak harness for the run_head design.
// Decides the partitioning-vs-simple question by watching long-horizon dynamics
// the short A0 runs can't: autovacuum keep-up, PK-index bloat trajectory,
// xid-freeze age, and long-open-transaction (xmin) holdback.
//
// Rate, not hardware, is what makes bloat/vacuum dynamics extrapolate, so this
// decouples from the laptop fsync ceiling via synchronous_commit=off + group commit.
// Emits a CSV time series to stdout (redirect to a file and plot).
//
// Env:
// ACTIVE=1000000 DURATION_S=3600 CONCURRENCY=16 FILLFACTOR=90
// SYNC_COMMIT=off COMMIT_BATCH=10 TMIN=5 TMAX=20 SLEEPER_PCT=0.02
// SAMPLE_S=30 REINDEX_EVERY_S=0 HOLD_TXN=0 (1 = open a long idle txn to test xmin holdback)
// AV_RELOPTS="autovacuum_vacuum_scale_factor=0.02,autovacuum_vacuum_cost_limit=2000"
import { createRequire } from "module";
const require = createRequire(import.meta.url);
const { Pool, Client } = require("/home/wes/projects/trigger/tech-test-wes-mason/node_modules/.pnpm/pg@8.16.3/node_modules/pg");
const CONN = process.env.PG_URL || "postgres://postgres:poc@localhost:55432/poc";
const ACTIVE = parseInt(process.env.ACTIVE || "1000000", 10);
const DURATION_S = parseInt(process.env.DURATION_S || "3600", 10);
const CONCURRENCY = parseInt(process.env.CONCURRENCY || "16", 10);
const FILLFACTOR = parseInt(process.env.FILLFACTOR || "90", 10);
const SYNC_COMMIT = (process.env.SYNC_COMMIT || "off").toLowerCase();
const COMMIT_BATCH = parseInt(process.env.COMMIT_BATCH || "10", 10);
const TMIN = parseInt(process.env.TMIN || "5", 10);
const TMAX = parseInt(process.env.TMAX || "20", 10);
const SLEEPER_PCT = parseFloat(process.env.SLEEPER_PCT || "0.02");
const SAMPLE_S = parseInt(process.env.SAMPLE_S || "30", 10);
const REINDEX_EVERY_S = parseInt(process.env.REINDEX_EVERY_S || "0", 10);
const HOLD_TXN = process.env.HOLD_TXN === "1";
const AV_RELOPTS = process.env.AV_RELOPTS || "";
const randInt = (lo, hi) => lo + Math.floor(Math.random() * (hi - lo + 1));
const fmt = (n) => Math.round(n).toLocaleString("en-US");
const num = (v) => Number(v || 0);
async function setup(pool) {
await pool.query("DROP TABLE IF EXISTS run_head");
const reloptions = [`fillfactor = ${FILLFACTOR}`, ...(AV_RELOPTS ? AV_RELOPTS.split(",").map((s) => s.trim()) : [])].join(", ");
await pool.query(`CREATE TABLE run_head (
run_id text PRIMARY KEY,
seq bigint NOT NULL DEFAULT 0,
execution_status text NOT NULL,
run_status text NOT NULL,
attempt_number int,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now()
) WITH (${reloptions})`);
await pool.query("CREATE EXTENSION IF NOT EXISTS pgstattuple");
const BATCH = 100000;
for (let off = 0; off < ACTIVE; off += BATCH) {
const lim = Math.min(BATCH, ACTIVE - off);
await pool.query(
`INSERT INTO run_head (run_id, seq, execution_status, run_status)
SELECT 'r' || g, 0, 'RUN_CREATED', 'PENDING' FROM generate_series($1::bigint, $2::bigint) g`,
[off + 1, off + lim]
);
}
}
async function sample(pool) {
const st = (await pool.query(
`SELECT n_tup_upd, n_tup_hot_upd, n_dead_tup, n_live_tup, autovacuum_count
FROM pg_stat_user_tables WHERE relname='run_head'`
)).rows[0] || {};
const sz = (await pool.query(
`SELECT pg_table_size('run_head') heap, pg_indexes_size('run_head') idx`
)).rows[0] || {};
const tt = (await pool.query(
`SELECT dead_tuple_percent dead, approx_free_percent free FROM pgstattuple_approx('run_head')`
)).rows[0] || {};
let idx = {};
try { idx = (await pool.query(`SELECT avg_leaf_density, leaf_fragmentation FROM pgstatindex('run_head_pkey')`)).rows[0] || {}; } catch {}
const age = (await pool.query(`SELECT age(relfrozenxid) a FROM pg_class WHERE relname='run_head'`)).rows[0] || {};
return { ...st, ...sz, ...tt, ...idx, ...age };
}
async function main() {
const pool = new Pool({ connectionString: CONN, max: CONCURRENCY + 2 });
console.error(`[D] active=${fmt(ACTIVE)} dur=${DURATION_S}s conc=${CONCURRENCY} ff=${FILLFACTOR} sync_commit=${SYNC_COMMIT} commit_batch=${COMMIT_BATCH} reindex_every=${REINDEX_EVERY_S}s hold_txn=${HOLD_TXN} relopts="${AV_RELOPTS}"`);
await setup(pool);
console.error(`[D] preloaded ${fmt(ACTIVE)} rows`);
// Optional: hold a long-open idle transaction to demonstrate xmin holdback.
let holder;
if (HOLD_TXN) {
holder = new Client({ connectionString: CONN });
await holder.connect();
await holder.query("BEGIN");
await holder.query("SELECT 1"); // pins a snapshot/xmin for the whole run
console.error(`[D] HOLD_TXN: a long-open transaction is pinning xmin (vacuum should not be able to reclaim dead tuples created after now)`);
}
const entries = new Array(ACTIVE);
for (let i = 0; i < ACTIVE; i++) {
const parked = Math.random() < SLEEPER_PCT;
entries[i] = { id: "r" + (i + 1), seq: 0, target: parked ? Infinity : randInt(TMIN, TMAX), parked };
}
const ops = { upd: 0, ins: 0, del: 0 };
const start = Date.now();
const deadline = start + DURATION_S * 1000;
let lastUpd = 0, lastHot = 0, lastSampleOps = 0, lastT = 0;
console.log("t_s,ops,upd_per_s,hot_ratio_window_pct,dead_pct,free_pct,heap_mb,idx_mb,idx_leaf_density,idx_fragmentation,n_dead_tup,autovac_count,xid_age");
const sampler = setInterval(async () => {
try {
const t = Math.round((Date.now() - start) / 1000);
const s = await sample(pool);
const dUpd = num(s.n_tup_upd) - lastUpd, dHot = num(s.n_tup_hot_upd) - lastHot;
const hotW = dUpd > 0 ? (dHot / dUpd) * 100 : 0;
const upsPerS = lastT > 0 ? (ops.upd - lastSampleOps) / (t - lastT) : ops.upd / Math.max(1, t);
lastUpd = num(s.n_tup_upd); lastHot = num(s.n_tup_hot_upd); lastSampleOps = ops.upd; lastT = t;
console.log([
t, ops.upd, Math.round(upsPerS), hotW.toFixed(1),
num(s.dead).toFixed(2), num(s.free).toFixed(2),
(num(s.heap) / 1e6).toFixed(1), (num(s.idx) / 1e6).toFixed(1),
s.avg_leaf_density != null ? num(s.avg_leaf_density).toFixed(1) : "",
s.leaf_fragmentation != null ? num(s.leaf_fragmentation).toFixed(1) : "",
num(s.n_dead_tup), num(s.autovacuum_count), num(s.a),
].join(","));
} catch (e) { console.error("sample error", e.message); }
}, SAMPLE_S * 1000);
let reindexer;
if (REINDEX_EVERY_S > 0) {
reindexer = setInterval(async () => {
try {
const before = num((await pool.query(`SELECT pg_indexes_size('run_head') i`)).rows[0].i);
const t0 = Date.now();
await pool.query("REINDEX INDEX CONCURRENTLY run_head_pkey");
const after = num((await pool.query(`SELECT pg_indexes_size('run_head') i`)).rows[0].i);
console.error(`[D] REINDEX pkey: ${(before / 1e6).toFixed(1)}MB -> ${(after / 1e6).toFixed(1)}MB in ${Date.now() - t0}ms`);
} catch (e) { console.error("reindex error", e.message); }
}, REINDEX_EVERY_S * 1000);
}
const updSQL = `UPDATE run_head SET seq=seq+1, execution_status='EXECUTING', run_status='EXECUTING', attempt_number=1, updated_at=now() WHERE run_id=$1 AND seq=$2 RETURNING seq`;
async function worker(wid) {
const client = await pool.connect();
await client.query(`SET synchronous_commit = ${SYNC_COMMIT}`);
let ptr = wid;
try {
while (Date.now() < deadline) {
await client.query("BEGIN");
for (let b = 0; b < COMMIT_BATCH && Date.now() < deadline; b++) {
if (ptr >= ACTIVE) ptr = wid;
const e = entries[ptr]; ptr += CONCURRENCY;
if (!e || e.parked) continue;
const r = await client.query(updSQL, [e.id, e.seq]);
ops.upd++;
if (r.rowCount === 1) e.seq = Number(r.rows[0].seq);
if (e.seq >= e.target) {
await client.query(`DELETE FROM run_head WHERE run_id=$1`, [e.id]); ops.del++;
const nid = `w${wid}-${ops.ins}`;
await client.query(`INSERT INTO run_head (run_id, seq, execution_status, run_status) VALUES ($1,0,'RUN_CREATED','PENDING')`, [nid]); ops.ins++;
entries[ptr - CONCURRENCY] = { id: nid, seq: 0, target: randInt(TMIN, TMAX), parked: false };
}
}
await client.query("COMMIT");
}
} finally { try { await client.query("ROLLBACK"); } catch {} client.release(); }
}
await Promise.all(Array.from({ length: CONCURRENCY }, (_, w) => worker(w)));
clearInterval(sampler); if (reindexer) clearInterval(reindexer);
const final = await sample(pool);
const elapsed = (Date.now() - start) / 1000;
if (holder) { await holder.query("ROLLBACK").catch(() => {}); await holder.end().catch(() => {}); }
console.error(`\n===== D SUMMARY =====`);
console.error(`elapsed: ${elapsed.toFixed(0)}s`);
console.error(`ops: upd ${fmt(ops.upd)} (${fmt(ops.upd / elapsed)}/s) ins ${fmt(ops.ins)} del ${fmt(ops.del)}`);
console.error(`final dead%: ${num(final.dead).toFixed(2)} free%: ${num(final.free).toFixed(2)} n_dead_tup: ${fmt(num(final.n_dead_tup))}`);
console.error(`final heap: ${(num(final.heap) / 1e6).toFixed(1)} MB index: ${(num(final.idx) / 1e6).toFixed(1)} MB`);
console.error(`index leaf: density ${final.avg_leaf_density != null ? num(final.avg_leaf_density).toFixed(1) : "?"}% fragmentation ${final.leaf_fragmentation != null ? num(final.leaf_fragmentation).toFixed(1) : "?"}%`);
console.error(`autovacuum runs: ${num(final.autovacuum_count)} final xid age: ${fmt(num(final.a))}`);
console.error(HOLD_TXN
? `xmin test: with a long-open txn held, dead tuples should be UNRECLAIMABLE (watch n_dead_tup climb without bound).`
: `xmin test: no long-open txn; sleepers are old ROWS only, which do NOT block vacuum.`);
await pool.end();
}
main().catch((e) => { console.error(e); process.exit(1); });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment