Skip to content

Instantly share code, notes, and snippets.

@guest271314
Last active April 3, 2025 21:03
Show Gist options
  • Save guest271314/69c07d422811e82674a8d864a5412a7f to your computer and use it in GitHub Desktop.
Save guest271314/69c07d422811e82674a8d864a5412a7f to your computer and use it in GitHub Desktop.
WebSocket server using Node.js builtins and WHATWG Streams
#!/usr/bin/env -S node
// WebSocket server using Node.js builtins and WHATWG Streams
// https://gist.github.com/robertrypula/b813ffe23a9489bae1b677f1608676c8
// https://gist.github.com/guest271314/735377527389f1de6145f0ac71ca1e86
import { readFileSync } from "node:fs";
import { createServer } from "node:tls";
import { Duplex } from "node:stream";
const debugBuffer = (bufferName, buffer) => {
const length = buffer ? buffer.length : "---";
// console.log(`:: DEBUG - ${bufferName} | ${length} | `, buffer, "\n");
};
/*
https://tools.ietf.org/html/rfc6455#section-5.2
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
OpCode
%x0 denotes a continuation frame
%x1 denotes a text frame
%x2 denotes a binary frame
%x3–7 are reserved for further non-control frames
%x8 denotes a connection close
%x9 denotes a ping
%xA denotes a pong
%xB-F are reserved for further control frames
*/
// ---------------------------------------------------------
const createWebSocketFrame = (payload) => {
const payloadLengthByteCount = payload.length < 126 ? 0 : 2;
const buffer = new ArrayBuffer(2 + payloadLengthByteCount + payload.length);
const view = new DataView(buffer);
let payloadOffset = 2;
if (payload.length >= Math.pow(2, 16)) {
throw new Error("Payload equal or bigger than 64 KiB is not supported");
}
view.setUint8(0, 0b10000010);
view.setUint8(1, payload.length < 126 ? payload.length : 126);
if (payloadLengthByteCount > 0) {
view.setUint16(2, payload.length);
payloadOffset += payloadLengthByteCount;
}
for (let i = 0, j = payloadOffset; i < payload.length; i++, j++) {
view.setUint8(j, payload[i]);
}
return buffer;
};
// ---------------------------------------------------------
const getParsedBuffer = (buffer) => {
const view = new DataView(buffer.buffer);
let bufferRemainingBytes;
let currentOffset = 0;
let maskingKey;
let payload;
if (currentOffset + 2 > buffer.length) {
return { payload: null, bufferRemainingBytes: buffer };
}
const firstByte = view.getUint8(currentOffset++);
const secondByte = view.getUint8(currentOffset++);
const isFinalFrame = !!((firstByte >>> 7) & 0x1);
const opCode = firstByte & 0xf;
const isMasked = !!((secondByte >>> 7) & 0x1); // https://security.stackexchange.com/questions/113297
let payloadLength = secondByte & 0x7f;
if (!isFinalFrame) {
console.log("[not final frame detected]\n");
}
if (opCode === 0x8) {
console.log("[connection close frame]\n");
// TODO read payload, for example payload equal to <0x03 0xe9> means 1001:
// 1001 indicates that an endpoint is "going away", such as a server
// going down or a browser having navigated away from a page.
// More info here: https://tools.ietf.org/html/rfc6455#section-7.4
return { payload: null, bufferRemainingBytes: null };
}
if (opCode !== 0x2 && opCode !== 0x0) {
throw new Error("Only binary and continuation frames are supported");
}
if (payloadLength > 125) {
if (payloadLength === 126) {
if (currentOffset + 2 > buffer.length) {
return { payload: null, bufferRemainingBytes: buffer };
}
payloadLength = view.getUint16(currentOffset);
currentOffset += 2;
} else {
throw new Error("Payload equal or bigger than 64 KiB is not supported");
}
}
if (isMasked) {
if (currentOffset + 4 > buffer.length) {
return { payload: null, bufferRemainingBytes: buffer };
}
maskingKey = view.getUint32(currentOffset);
currentOffset += 4;
}
if (currentOffset + payloadLength > buffer.length) {
console.log("[misalignment between WebSocket frame and NodeJs Buffer]\n");
return { payload: null, bufferRemainingBytes: buffer };
}
payload = new Uint8Array(payloadLength);
if (isMasked) {
for (let i = 0, j = 0; i < payloadLength; ++i, j = i % 4) {
const shift = j === 3 ? 0 : (3 - j) << 3;
const mask = (shift === 0 ? maskingKey : maskingKey >>> shift) & 0xff;
payload[i] = mask ^ view.getUint8(currentOffset++);
}
} else {
for (let i = 0; i < payloadLength; i++) {
payload[i] = view.getUint8(currentOffset++);
}
}
bufferRemainingBytes = new Uint8Array(buffer.length - currentOffset);
for (let i = 0; i < bufferRemainingBytes.length; i++) {
bufferRemainingBytes[i] = view.getUint8(currentOffset++);
}
return { payload, bufferRemainingBytes };
};
function parseWebSocketFrame(buffer) {
let bufferToParse = buffer;
let parsedBuffer;
do {
parsedBuffer = getParsedBuffer(bufferToParse);
debugBuffer("buffer", buffer);
debugBuffer("bufferToParse", bufferToParse);
debugBuffer("parsedBuffer.payload", parsedBuffer.payload);
debugBuffer(
"parsedBuffer.bufferRemainingBytes",
parsedBuffer.bufferRemainingBytes,
);
if (parsedBuffer.payload === null) {
return parsedBuffer.payload;
}
bufferToParse = parsedBuffer.bufferRemainingBytes;
if (parsedBuffer.payload) {
console.log(parsedBuffer);
break;
}
} while (parsedBuffer.payload && parsedBuffer.bufferRemainingBytes.length);
return createWebSocketFrame(parsedBuffer.payload);
}
// https://stackoverflow.com/a/77398427
async function digest(message, algo = "SHA-1") {
const bytes = new Uint8Array(
await crypto.subtle.digest(
algo,
new TextEncoder().encode(
`${message}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`,
),
),
);
return btoa(String.fromCodePoint(...bytes));
}
const abortable = new AbortController();
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const encode = (text) => encoder.encode(text);
const {
signal,
} = abortable;
async function onSocketHandler(socket) {
const { readable, writable } = Duplex.toWeb(socket);
const writer = writable.getWriter();
await readable.pipeTo(
new WritableStream({
async write(value) {
const request = decoder.decode(value);
console.log({ request });
if (/Connection: Upgrade/i.test(request)) {
const headers = new Headers(
request.match(/.+/g).map((line) => line.split(/:\s|\s\/\s/)),
);
const key = headers.get("sec-websocket-key");
const accept = await digest(key);
await writer.write(
encode("HTTP/1.1 101 Switching Protocols\r\n"),
);
await writer.write(encode("Upgrade: websocket\r\n"));
await writer.write(encode("Connection: Upgrade\r\n"));
await writer.write(
encode(`Sec-WebSocket-Accept: ${accept}\r\n\r\n`),
);
// Don't close WebSocket
// await writer.close();
} else {
const response = parseWebSocketFrame(value);
// console.log({ response });
if (response === null) {
// Close frame. From WebSocketStream client: await writer.close(); await writer.abort()
await writer.write(new Uint8Array([0x88, 0x00])); // 136, 0
await writer.close();
return await writer.closed;
// return abortable.abort("WebSocket closed by client, aborted in server");
}
await writer.write(new Uint8Array(response));
// Don't close WebSocket
// await writer.close();
}
},
}),
).catch(console.log);
}
const server = createServer({
key: readFileSync(
"./certificate.key",
),
cert: readFileSync(
"./certificate.pem",
),
}, onSocketHandler);
server.on("error", console.log);
server.listen({
port: 8000,
host: "0.0.0.0",
signal,
}, () => {
const {
address,
family,
port,
} = server.address();
console.log(
`Listening on family: ${family}, address: ${address}, port: ${port}`,
);
});
/*
var abortable = new AbortController();
var {
signal,
} = abortable;
var wss = new WebSocketStream("wss://0.0.0.0:8000");
console.log(wss);
wss.closed.catch((e) => {});
var {
readable,
writable,
} = await wss.opened.catch(console.error);
var writer = writable.getWriter();
// .pipeThrough(new TextDecoderStream())
var socket = readable.pipeTo(new WritableStream({
start(c) {
console.log("Start", c);
},
async write(value) {
console.log(decoder.decode(value));
},
close() {
console.log("WritableStream closed");
},
abort(reason) {
console.log({
reason
});
},
}), {
signal,
}, );
socket.then(() => console.log("WebSocketStream complete")).catch(console.warn);
var encoder = new TextEncoder();
var decoder = new TextDecoder();
var encode = (text) => encoder.encode(text);
await writer.write(encode("a"));
await writer.write(encode("b"));
await writer.write(encode("c"));
// await writer.close();
// await writer.abort();
// writer.releaseLock(); await writable.abort();
// abortable.abort("reason");
*/
/*
var wss = new WebSocket("wss://0.0.0.0:8000");
wss.addEventListener("open", (e) => {
console.log(e);
wss.send(new Blob(["a"]));
wss.send(new TextEncoder().encode("b"));
wss.send(new TextEncoder().encode("c").buffer);
});
wss.addEventListener("close", (e) => {
console.log(e);
});
wss.addEventListener("error", (e) => {
console.log(e);
});
wss.addEventListener("message", async (e) => {
console.log(await e.data.text());
});
wss.binaryType = "blob";
// wss.close();
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment