Created
May 26, 2020 19:35
-
-
Save sbrl/33bc3afc170a12f6eaf49b4ca8e03602 to your computer and use it in GitHub Desktop.
Multi-process line-by-line reading from a single pipe in Node.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
import EventEmitter from 'events'; | |
import child_process from 'child_process'; | |
/** | |
* Helper method that waits for an event to be fired on a given object. | |
* @param {EventEmitter} obj The object that will fire the event - must inherit from EventEmitter | |
* @param {string} event_name The name of the event to wait for. | |
* @return {Promise} A promise that resolves when the specified event is fired on the given object. | |
*/ | |
function wait_for_event(obj, event_name) { | |
return new Promise((resolve) => { | |
obj.once(event_name, resolve); | |
}); | |
} | |
class Controller extends EventEmitter { | |
constructor() { | |
super(); | |
this.worker_count = 4; | |
this.workers = []; | |
this.counter = 0; | |
this.counter_last = 0; | |
this.stats_update_last = 0; | |
this.someone_is_reading = false; | |
} | |
log_msg(...msg) { | |
console.log(`[master] `, ...msg); | |
} | |
stats_update() { | |
if(new Date() - this.stats_update_last < 1000) return; | |
let counter_diff = this.counter - this.counter_last; | |
console.log(`Rate: ${counter_diff} items / sec`) | |
this.counter_last = this.counter; | |
this.stats_update_last = +new Date(); | |
} | |
async send_work(worker) { | |
while(this.someone_is_reading) { | |
// this.log_msg(`someone is reading, waiting for slot`); | |
await wait_for_event(this, "reading_complete"); | |
} | |
// this.log_msg(`got slot`); | |
this.someone_is_reading = true; | |
worker.send({ event: "work", count: this.counter }); | |
this.counter++; | |
} | |
async handle_message(worker, i, message) { | |
switch(message.event) { | |
case "read_complete": | |
this.someone_is_reading = false; | |
this.emit("reading_complete"); | |
break; | |
case "ready": | |
case "done": | |
this.stats_update(); | |
process.nextTick(() => this.send_work(worker)); | |
break; | |
case "end": | |
worker.send({ event: "exit" }); | |
this.log_msg(`Worker ${i} exited`); | |
this.workers.splice(i, 1); | |
break; | |
} | |
} | |
start() { | |
for(let i = 0; i < this.worker_count; i++) { | |
this.log_msg(`Spawning worker ${i} / ${this.worker_count}`); | |
let next = child_process.fork("worker.mjs", { | |
stdio: [ 0, 1, 2, "ipc" ] | |
}); | |
next.on("message", this.handle_message.bind(this, next, i)); | |
this.workers.push(next); | |
} | |
} | |
} | |
let controller = new Controller(); | |
controller.start(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# This test demonstrates reading from a single pipe using multiple processes. | |
# License: Mozilla Public Licence 2.0 (MPL-2.0) | |
seq 1 1000000 | awk '{ print("LINE_START This is a number [" $0 "] more text more text LINE_END") }' | node main.mjs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
import fs from 'fs'; | |
// Global buffer to avoid unnecessary memory churn | |
let buffer = Buffer.alloc(4096); | |
function read_line_unbuffered(fd) { | |
let i = 0; | |
while(true) { | |
let bytes_read = fs.readSync(fd, buffer, i, 1); | |
if(bytes_read !== 1 || buffer[i] == 0x0A) { | |
if(i == 0 && bytes_read == null) return null; | |
return buffer.toString("utf-8", 0, i); // This is not inclusive, so we can abuse it to trim the \n off the end | |
} | |
i++; | |
if(i == buffer.length) { | |
let new_buffer = new Buffer(Math.ceil(buffer.length * 1.5)); | |
buffer.copy(new_buffer); | |
buffer = new_buffer; | |
} | |
} | |
} | |
function log_msg(...msg) { | |
console.log(`[worker ${process.pid}] `, ...msg); | |
} | |
log_msg(`hello, world`); | |
function sleep_async(ms) { | |
return new Promise((resolve) => { | |
setTimeout(resolve, ms); | |
}); | |
} | |
process.on("message", async (message) => { | |
switch(message.event) { | |
case "work": | |
// log_msg(`got work, count ${message.count}`); | |
// log_msg(`starting read`); let start = new Date(); | |
let next = read_line_unbuffered(0); | |
// log_msg(`ended read in ${new Date() - start}ms`); | |
process.send({ event: "read_complete" }); | |
if(next == null) { | |
log_msg(`Done reading`); | |
process.send({ event: "end" }); | |
} | |
// Simulate doing hard work | |
// let start = +new Date(); | |
// while(new Date() - start < 1000) { | |
// // noop | |
// } | |
log_msg(`[${message.count}] Processed '${next}'`); | |
// await sleep_async(500 + Math.random()*1000s); | |
process.send({ event: "done" }); | |
break; | |
case "exit": | |
log_msg(`exiting`); | |
process.exit(0); | |
} | |
}); | |
process.send({ event: "ready" }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment