Last active
April 3, 2025 21:03
-
-
Save guest271314/69c07d422811e82674a8d864a5412a7f to your computer and use it in GitHub Desktop.
WebSocket server using Node.js builtins and WHATWG Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S node | |
// WebSocket server using Node.js builtins and WHATWG Streams | |
// https://gist.github.com/robertrypula/b813ffe23a9489bae1b677f1608676c8 | |
// https://gist.github.com/guest271314/735377527389f1de6145f0ac71ca1e86 | |
import { readFileSync } from "node:fs"; | |
import { createServer } from "node:tls"; | |
import { Duplex } from "node:stream"; | |
const debugBuffer = (bufferName, buffer) => { | |
const length = buffer ? buffer.length : "---"; | |
// console.log(`:: DEBUG - ${bufferName} | ${length} | `, buffer, "\n"); | |
}; | |
/* | |
https://tools.ietf.org/html/rfc6455#section-5.2 | |
0 1 2 3 | |
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | |
+-+-+-+-+-------+-+-------------+-------------------------------+ | |
|F|R|R|R| opcode|M| Payload len | Extended payload length | | |
|I|S|S|S| (4) |A| (7) | (16/64) | | |
|N|V|V|V| |S| | (if payload len==126/127) | | |
| |1|2|3| |K| | | | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | |
| Extended payload length continued, if payload len == 127 | | |
+ - - - - - - - - - - - - - - - +-------------------------------+ | |
| |Masking-key, if MASK set to 1 | | |
+-------------------------------+-------------------------------+ | |
| Masking-key (continued) | Payload Data | | |
+-------------------------------- - - - - - - - - - - - - - - - + | |
: Payload Data continued ... : | |
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | |
| Payload Data continued ... | | |
+---------------------------------------------------------------+ | |
OpCode | |
%x0 denotes a continuation frame | |
%x1 denotes a text frame | |
%x2 denotes a binary frame | |
%x3–7 are reserved for further non-control frames | |
%x8 denotes a connection close | |
%x9 denotes a ping | |
%xA denotes a pong | |
%xB-F are reserved for further control frames | |
*/ | |
// --------------------------------------------------------- | |
const createWebSocketFrame = (payload) => { | |
const payloadLengthByteCount = payload.length < 126 ? 0 : 2; | |
const buffer = new ArrayBuffer(2 + payloadLengthByteCount + payload.length); | |
const view = new DataView(buffer); | |
let payloadOffset = 2; | |
if (payload.length >= Math.pow(2, 16)) { | |
throw new Error("Payload equal or bigger than 64 KiB is not supported"); | |
} | |
view.setUint8(0, 0b10000010); | |
view.setUint8(1, payload.length < 126 ? payload.length : 126); | |
if (payloadLengthByteCount > 0) { | |
view.setUint16(2, payload.length); | |
payloadOffset += payloadLengthByteCount; | |
} | |
for (let i = 0, j = payloadOffset; i < payload.length; i++, j++) { | |
view.setUint8(j, payload[i]); | |
} | |
return buffer; | |
}; | |
// --------------------------------------------------------- | |
const getParsedBuffer = (buffer) => { | |
const view = new DataView(buffer.buffer); | |
let bufferRemainingBytes; | |
let currentOffset = 0; | |
let maskingKey; | |
let payload; | |
if (currentOffset + 2 > buffer.length) { | |
return { payload: null, bufferRemainingBytes: buffer }; | |
} | |
const firstByte = view.getUint8(currentOffset++); | |
const secondByte = view.getUint8(currentOffset++); | |
const isFinalFrame = !!((firstByte >>> 7) & 0x1); | |
const opCode = firstByte & 0xf; | |
const isMasked = !!((secondByte >>> 7) & 0x1); // https://security.stackexchange.com/questions/113297 | |
let payloadLength = secondByte & 0x7f; | |
if (!isFinalFrame) { | |
console.log("[not final frame detected]\n"); | |
} | |
if (opCode === 0x8) { | |
console.log("[connection close frame]\n"); | |
// TODO read payload, for example payload equal to <0x03 0xe9> means 1001: | |
// 1001 indicates that an endpoint is "going away", such as a server | |
// going down or a browser having navigated away from a page. | |
// More info here: https://tools.ietf.org/html/rfc6455#section-7.4 | |
return { payload: null, bufferRemainingBytes: null }; | |
} | |
if (opCode !== 0x2 && opCode !== 0x0) { | |
throw new Error("Only binary and continuation frames are supported"); | |
} | |
if (payloadLength > 125) { | |
if (payloadLength === 126) { | |
if (currentOffset + 2 > buffer.length) { | |
return { payload: null, bufferRemainingBytes: buffer }; | |
} | |
payloadLength = view.getUint16(currentOffset); | |
currentOffset += 2; | |
} else { | |
throw new Error("Payload equal or bigger than 64 KiB is not supported"); | |
} | |
} | |
if (isMasked) { | |
if (currentOffset + 4 > buffer.length) { | |
return { payload: null, bufferRemainingBytes: buffer }; | |
} | |
maskingKey = view.getUint32(currentOffset); | |
currentOffset += 4; | |
} | |
if (currentOffset + payloadLength > buffer.length) { | |
console.log("[misalignment between WebSocket frame and NodeJs Buffer]\n"); | |
return { payload: null, bufferRemainingBytes: buffer }; | |
} | |
payload = new Uint8Array(payloadLength); | |
if (isMasked) { | |
for (let i = 0, j = 0; i < payloadLength; ++i, j = i % 4) { | |
const shift = j === 3 ? 0 : (3 - j) << 3; | |
const mask = (shift === 0 ? maskingKey : maskingKey >>> shift) & 0xff; | |
payload[i] = mask ^ view.getUint8(currentOffset++); | |
} | |
} else { | |
for (let i = 0; i < payloadLength; i++) { | |
payload[i] = view.getUint8(currentOffset++); | |
} | |
} | |
bufferRemainingBytes = new Uint8Array(buffer.length - currentOffset); | |
for (let i = 0; i < bufferRemainingBytes.length; i++) { | |
bufferRemainingBytes[i] = view.getUint8(currentOffset++); | |
} | |
return { payload, bufferRemainingBytes }; | |
}; | |
function parseWebSocketFrame(buffer) { | |
let bufferToParse = buffer; | |
let parsedBuffer; | |
do { | |
parsedBuffer = getParsedBuffer(bufferToParse); | |
debugBuffer("buffer", buffer); | |
debugBuffer("bufferToParse", bufferToParse); | |
debugBuffer("parsedBuffer.payload", parsedBuffer.payload); | |
debugBuffer( | |
"parsedBuffer.bufferRemainingBytes", | |
parsedBuffer.bufferRemainingBytes, | |
); | |
if (parsedBuffer.payload === null) { | |
return parsedBuffer.payload; | |
} | |
bufferToParse = parsedBuffer.bufferRemainingBytes; | |
if (parsedBuffer.payload) { | |
console.log(parsedBuffer); | |
break; | |
} | |
} while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length); | |
return createWebSocketFrame(parsedBuffer.payload); | |
} | |
// https://stackoverflow.com/a/77398427 | |
async function digest(message, algo = "SHA-1") { | |
const bytes = new Uint8Array( | |
await crypto.subtle.digest( | |
algo, | |
new TextEncoder().encode( | |
`${message}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`, | |
), | |
), | |
); | |
return btoa(String.fromCodePoint(...bytes)); | |
} | |
const abortable = new AbortController(); | |
const encoder = new TextEncoder(); | |
const decoder = new TextDecoder(); | |
const encode = (text) => encoder.encode(text); | |
const { | |
signal, | |
} = abortable; | |
async function onSocketHandler(socket) { | |
const { readable, writable } = Duplex.toWeb(socket); | |
const writer = writable.getWriter(); | |
await readable.pipeTo( | |
new WritableStream({ | |
async write(value) { | |
const request = decoder.decode(value); | |
console.log({ request }); | |
if (/Connection: Upgrade/i.test(request)) { | |
const headers = new Headers( | |
request.match(/.+/g).map((line) => line.split(/:\s|\s\/\s/)), | |
); | |
const key = headers.get("sec-websocket-key"); | |
const accept = await digest(key); | |
await writer.write( | |
encode("HTTP/1.1 101 Switching Protocols\r\n"), | |
); | |
await writer.write(encode("Upgrade: websocket\r\n")); | |
await writer.write(encode("Connection: Upgrade\r\n")); | |
await writer.write( | |
encode(`Sec-WebSocket-Accept: ${accept}\r\n\r\n`), | |
); | |
// Don't close WebSocket | |
// await writer.close(); | |
} else { | |
const response = parseWebSocketFrame(value); | |
// console.log({ response }); | |
if (response === null) { | |
// Close frame. From WebSocketStream client: await writer.close(); await writer.abort() | |
await writer.write(new Uint8Array([0x88, 0x00])); // 136, 0 | |
await writer.close(); | |
return await writer.closed; | |
// return abortable.abort("WebSocket closed by client, aborted in server"); | |
} | |
await writer.write(new Uint8Array(response)); | |
// Don't close WebSocket | |
// await writer.close(); | |
} | |
}, | |
}), | |
).catch(console.log); | |
} | |
const server = createServer({ | |
key: readFileSync( | |
"./certificate.key", | |
), | |
cert: readFileSync( | |
"./certificate.pem", | |
), | |
}, onSocketHandler); | |
server.on("error", console.log); | |
server.listen({ | |
port: 8000, | |
host: "0.0.0.0", | |
signal, | |
}, () => { | |
const { | |
address, | |
family, | |
port, | |
} = server.address(); | |
console.log( | |
`Listening on family: ${family}, address: ${address}, port: ${port}`, | |
); | |
}); | |
/* | |
var abortable = new AbortController(); | |
var { | |
signal, | |
} = abortable; | |
var wss = new WebSocketStream("wss://0.0.0.0:8000"); | |
console.log(wss); | |
wss.closed.catch((e) => {}); | |
var { | |
readable, | |
writable, | |
} = await wss.opened.catch(console.error); | |
var writer = writable.getWriter(); | |
// .pipeThrough(new TextDecoderStream()) | |
var socket = readable.pipeTo(new WritableStream({ | |
start(c) { | |
console.log("Start", c); | |
}, | |
async write(value) { | |
console.log(decoder.decode(value)); | |
}, | |
close() { | |
console.log("WritableStream closed"); | |
}, | |
abort(reason) { | |
console.log({ | |
reason | |
}); | |
}, | |
}), { | |
signal, | |
}, ); | |
socket.then(() => console.log("WebSocketStream complete")).catch(console.warn); | |
var encoder = new TextEncoder(); | |
var decoder = new TextDecoder(); | |
var encode = (text) => encoder.encode(text); | |
await writer.write(encode("a")); | |
await writer.write(encode("b")); | |
await writer.write(encode("c")); | |
// await writer.close(); | |
// await writer.abort(); | |
// writer.releaseLock(); await writable.abort(); | |
// abortable.abort("reason"); | |
*/ | |
/* | |
var wss = new WebSocket("wss://0.0.0.0:8000"); | |
wss.addEventListener("open", (e) => { | |
console.log(e); | |
wss.send(new Blob(["a"])); | |
wss.send(new TextEncoder().encode("b")); | |
wss.send(new TextEncoder().encode("c").buffer); | |
}); | |
wss.addEventListener("close", (e) => { | |
console.log(e); | |
}); | |
wss.addEventListener("error", (e) => { | |
console.log(e); | |
}); | |
wss.addEventListener("message", async (e) => { | |
console.log(await e.data.text()); | |
}); | |
wss.binaryType = "blob"; | |
// wss.close(); | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment