-
-
Save ForbesLindesay/c5469c67a3bca006cb441fb19c00703d to your computer and use it in GitHub Desktop.
Peek a node.js stream in TypeScript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * Peek at the next chunk of data in the stream without consuming it. | |
| * | |
| * NOTE: no data will be available to read from the output stream until | |
| * at least `length` bytes have been read from the input stream. | |
| * | |
| * @param input The input stream to read from. | |
| * @param length The number of bytes to peek. | |
| * @returns A promise that resolves to the peeked data and an output stream | |
| * that includes the peeked data and any subsequent data. | |
| */ | |
| async function peekNode(input: Readable, length: number) { | |
| const buffer: Buffer[] = [] | |
| let bytesRead = 0 | |
| let promiseControl: | |
| | { | |
| resolve: (value: Buffer) => void | |
| reject: (reason?: any) => void | |
| } | |
| | undefined | |
| const peeked = new Promise<Buffer>((resolve, reject) => { | |
| promiseControl = { resolve, reject } | |
| }) | |
| const output = input.pipe( | |
| new Transform({ | |
| transform(chunk: Buffer, _, callback) { | |
| if (bytesRead < length) { | |
| buffer.push(chunk) | |
| bytesRead += chunk.length | |
| if (bytesRead >= length) { | |
| promiseControl?.resolve(Buffer.concat(buffer).subarray(0, length)) | |
| for (const chunk of buffer) { | |
| this.push(chunk) | |
| } | |
| buffer.length = 0 // Clear the buffer | |
| } | |
| callback() | |
| } else { | |
| callback(null, chunk) | |
| } | |
| }, | |
| flush(callback) { | |
| if (bytesRead < length) { | |
| promiseControl?.reject(new Error(`Stream shorter than ${length} bytes`)) | |
| } | |
| callback() | |
| }, | |
| }) | |
| ) | |
| input.on("error", err => { | |
| output.emit("error", err) | |
| promiseControl?.reject(err) | |
| }) | |
| return { peeked, output } | |
| } | |
| let readCalled = false | |
| const inputStream = new Readable({ | |
| async read() { | |
| if (readCalled) return | |
| readCalled = true | |
| // Simulate a stream that emits data | |
| this.push(Buffer.from([1, 2])) | |
| await new Promise(resolve => setTimeout(resolve, 1_000)) | |
| this.push(Buffer.from([3, 4, 5])) | |
| await new Promise(resolve => setTimeout(resolve, 1_000)) | |
| this.push(Buffer.from([6, 7, 8])) | |
| await new Promise(resolve => setTimeout(resolve, 1_000)) | |
| this.push(null) // End the stream | |
| }, | |
| }) | |
| peekNode(inputStream, 3) | |
| .then(({ peeked, output }) => { | |
| peeked.then(data => { | |
| console.log(`peeked data:`, data) | |
| }) | |
| output.on("data", chunk => { | |
| console.log(`output chunk:`, chunk) | |
| }) | |
| }) | |
| .catch(err => { | |
| console.error(`Error:`, err.message) | |
| }) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment