Created
May 21, 2020 06:51
-
-
Save csotiriou/a844cc4a18445b78ba8268b29dfccdc3 to your computer and use it in GitHub Desktop.
Reading a file with backpressure using NodeJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type PromiseThunk = (lineBatch: string[]) => Promise<any>; | |
/** | |
* | |
* @param filePath the file to open | |
* @param promiseThunk A function that returns a promise. This function will be called with the current batch lines as the arcument. | |
* @param bufferSize how many lines should we process at a time | |
*/ | |
export async function streamFile2(filePath: string, promiseThunk: PromiseThunk, bufferSize: number = 100) { | |
return new Promise<unknown>((resolve, reject) => { | |
//create the NodeJS read stream | |
const stream = fs.createReadStream(filePath, { encoding: 'utf8' }); | |
//how many lines should we process at a time? | |
let buffer = []; | |
stream | |
//ensure parsing line by line | |
.pipe(split2()) | |
//ensure that the next chunk will be processed by the | |
//stream only when we want to | |
.pipe( | |
through2((chunk, enc, callback) => { | |
//put the chunk along with the other ones | |
buffer.push(chunk.toString()); | |
if (buffer.length < bufferSize) { | |
callback(); //next step, no process | |
} else { | |
//call the method that creates a promise, and at the end | |
//just empty the buffer, and process the next chunk | |
promiseThunk(buffer).finally(() => { | |
buffer = []; | |
callback(); | |
}); | |
} | |
}), | |
) | |
.on('error', error => { | |
reject(error); | |
}) | |
.on('finish', () => { | |
//any remaining data still needs to be sent | |
//resolve the outer promise only when the final batch has completed processing | |
if (buffer.length > 0) { | |
promiseThunk(buffer).finally(() => { | |
resolve(true); | |
}); | |
} else { | |
resolve(true); | |
} | |
}); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment