Skip to content

Instantly share code, notes, and snippets.

@ansarizafar
Forked from td0m/clustered.ts
Created April 26, 2025 05:53

Revisions

  1. @td0m td0m created this gist Apr 25, 2025.
    86 changes: 86 additions & 0 deletions clustered.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,86 @@
    import cluster from "node:cluster";
    import { availableParallelism } from "node:os";

    export async function clustered({
    numWorkers = availableParallelism(),
    primary,
    worker,
    recoverWorkers = true,
    shutdownPrimary,
    }: {
    numWorkers?: number;
    primary?: () => Promise<void>;
    worker: () => Promise<void>;
    recoverWorkers?: boolean;
    shutdownPrimary?: () => Promise<void>;
    }) {
    if (cluster.isPrimary) {
    process.on("SIGHUP", () => {
    console.info("SIGHUP");
    const workersAtSighup = Object.values(cluster.workers || {}).filter(
    (worker) => worker?.isConnected()
    );
    for (const worker of workersAtSighup) {
    const newWorker = cluster.fork();
    newWorker.on("listening", () => {
    console.info(`killing worker ${worker?.process.pid}`);
    worker?.disconnect();
    });
    }
    });

    const shutdown = async () => {
    for (const worker of Object.values(cluster.workers || {})) {
    worker?.disconnect();
    }

    if (shutdownPrimary) {
    await shutdownPrimary();
    }

    console.log("exiting now");
    process.exit(0);
    };

    // eslint-disable-next-line @typescript-eslint/no-misused-promises
    process.on("SIGTERM", async () => {
    console.info("SIGTERM");
    await shutdown();
    });

    // eslint-disable-next-line @typescript-eslint/no-misused-promises
    process.on("SIGINT", async () => {
    console.info("SIGINT");
    await shutdown();
    });

    // Fork
    for (let i = 0; i < numWorkers; i++) {
    cluster.fork();
    }

    if (recoverWorkers) {
    cluster.on("exit", (worker, code, signal) => {
    // we did this on purpose, so we don't need to restart the worker
    if (worker.exitedAfterDisconnect) {
    console.info(
    `worker ${worker.process.pid} exited after disconnect. not restarting.`
    );
    return;
    }
    console.info(
    { worker_pid: worker.process.pid, signal, code },
    "worker exited, restarting..."
    );
    cluster.fork();
    });
    }

    if (primary) {
    await primary();
    }
    } else {
    await worker();
    }
    }

    18 changes: 18 additions & 0 deletions index.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,18 @@
    import { createServer } from "node:http";
    import { clustered } from "./clustered.ts";

    const server = createServer((req, res) => {
    res.writeHead(200, { "Content-Type": "text/plain" });
    res.end("Hello, World!\n");
    });

    await clustered({
    numWorkers: 3,
    primary: async () => {
    console.log("primary started!", process.pid);
    },
    worker: async () => {
    console.log("worker started!", process.pid);
    server.listen(3000);
    },
    });