Skip to content

Instantly share code, notes, and snippets.

@ForbesLindesay
Forked from samwho/peek.ts
Created August 5, 2025 18:09
Show Gist options
  • Save ForbesLindesay/c5469c67a3bca006cb441fb19c00703d to your computer and use it in GitHub Desktop.
Save ForbesLindesay/c5469c67a3bca006cb441fb19c00703d to your computer and use it in GitHub Desktop.
Peek a node.js stream in TypeScript
/**
* Peek at the next chunk of data in the stream without consuming it.
*
* NOTE: no data will be available to read from the output stream until
* at least `length` bytes have been read from the input stream.
*
* @param input The input stream to read from.
* @param length The number of bytes to peek.
* @returns A promise that resolves to the peeked data and an output stream
* that includes the peeked data and any subsequent data.
*/
async function peekNode(input: Readable, length: number) {
const buffer: Buffer[] = []
let bytesRead = 0
let promiseControl:
| {
resolve: (value: Buffer) => void
reject: (reason?: any) => void
}
| undefined
const peeked = new Promise<Buffer>((resolve, reject) => {
promiseControl = { resolve, reject }
})
const output = input.pipe(
new Transform({
transform(chunk: Buffer, _, callback) {
if (bytesRead < length) {
buffer.push(chunk)
bytesRead += chunk.length
if (bytesRead >= length) {
promiseControl?.resolve(Buffer.concat(buffer).subarray(0, length))
for (const chunk of buffer) {
this.push(chunk)
}
buffer.length = 0 // Clear the buffer
}
callback()
} else {
callback(null, chunk)
}
},
flush(callback) {
if (bytesRead < length) {
promiseControl?.reject(new Error(`Stream shorter than ${length} bytes`))
}
callback()
},
})
)
input.on("error", err => {
output.emit("error", err)
promiseControl?.reject(err)
})
return { peeked, output }
}
let readCalled = false
const inputStream = new Readable({
async read() {
if (readCalled) return
readCalled = true
// Simulate a stream that emits data
this.push(Buffer.from([1, 2]))
await new Promise(resolve => setTimeout(resolve, 1_000))
this.push(Buffer.from([3, 4, 5]))
await new Promise(resolve => setTimeout(resolve, 1_000))
this.push(Buffer.from([6, 7, 8]))
await new Promise(resolve => setTimeout(resolve, 1_000))
this.push(null) // End the stream
},
})
peekNode(inputStream, 3)
.then(({ peeked, output }) => {
peeked.then(data => {
console.log(`peeked data:`, data)
})
output.on("data", chunk => {
console.log(`output chunk:`, chunk)
})
})
.catch(err => {
console.error(`Error:`, err.message)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment