Skip to content

Instantly share code, notes, and snippets.

@guest271314
Created April 26, 2025 05:59
Show Gist options
  • Save guest271314/87b689edc19646d18975cb74126d67d0 to your computer and use it in GitHub Desktop.
Save guest271314/87b689edc19646d18975cb74126d67d0 to your computer and use it in GitHub Desktop.
WebSocket server for Node.js
// Handles 16384*3 byte length input/output
import { createServer } from "node:net";
import { Duplex } from "node:stream";
// https://github.com/lsert/websocketparser
function wsparser(d) {
const tArr = d;
const group0 = tArr[0];
const FIN = group0 >> 7;
const RSV1 = (group0 & 64) >> 6;
const RSV2 = (group0 & 32) >> 5;
const RSV3 = (group0 & 16) >> 4;
const opcode = group0 & 15;
const group1 = tArr[1];
const group2 = tArr[2];
const group3 = tArr[3];
// 计算数据开始点
let counter = 2;
const mask = group1 >> 7;
const payloadLens = group1 & 127;
let realLens = payloadLens;
if (payloadLens === 126) {
realLens = (group2 << 8) | group3;
counter += 2;
} else if (payloadLens === 127) {
const ab = new ArrayBuffer(8);
const dv = new DataView(ab);
for (let i = 0; i < 8; i++) {
dv.setUint8(i, tArr[i + 2]);
}
realLens = dv.getBigUint64(0, false);
counter += 8;
}
let data;
if (mask === 1) {
const maskKeyArr = tArr.slice(counter, counter + 4);
counter += 4;
// 解掩码
data = tArr.slice(counter).map((item, index) => {
const j = index % 4;
return item ^ maskKeyArr[j];
});
} else {
data = tArr.slice(counter);
}
let arr = {
FIN,
mask,
RSV1,
RSV2,
RSV3,
opcode,
realLens,
data: new Uint8Array(data),
};
return arr;
}
// https://gist.github.com/robertrypula/b813ffe23a9489bae1b677f1608676c8
const createWebSocketFrame = (payload) => {
const payloadLengthByteCount = payload.length < 126 ? 0 : 2;
const buffer = new ArrayBuffer(2 + payloadLengthByteCount + payload.length);
const view = new DataView(buffer);
let payloadOffset = 2;
console.log(payload.length, payloadLengthByteCount);
// TODO: Handle greater than 65536 input ArrayBuffer byteLength (guest271314)
if (payload.length >= Math.pow(2, 16)) {
throw new Error("Payload equal or bigger than 64 KiB is not supported");
}
view.setUint8(0, 0b10000010);
view.setUint8(1, payload.length < 126 ? payload.length : 126);
if (payloadLengthByteCount > 0) {
view.setUint16(2, payload.length, false);
payloadOffset += payloadLengthByteCount;
}
for (let i = 0, j = payloadOffset; i < payload.length; i++, j++) {
view.setUint8(j, payload[i]);
}
return buffer;
};
// Handle WebSocket handshake
// https://stackoverflow.com/a/77398427
async function digest(message, algo = "SHA-1") {
return btoa(
[
...new Uint8Array(
await crypto.subtle.digest(
algo,
new TextEncoder().encode(
`${message}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`,
),
),
),
].map((s) => String.fromCodePoint(s)).join(""),
);
}
const abortable = new AbortController();
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const encode = (text) => encoder.encode(text);
const {
signal,
} = abortable;
async function onSocketHandler(socket) {
const { readable, writable } = Duplex.toWeb(socket);
const writer = writable.getWriter();
readable.pipeTo(
new WritableStream({
start: () => {
this.fragments = [];
},
write: async (value) => {
const request = decoder.decode(value);
console.log({ request });
if (/Connection: Upgrade/i.test(request)) {
const headers = new Headers(
request.match(/.+/g).map((line) => line.split(/:\s|\s\/\s/)),
);
const key = headers.get("sec-websocket-key");
const accept = await digest(key);
await writer.write(
encode("HTTP/1.1 101 Switching Protocols\r\n"),
);
await writer.write(encode("Upgrade: websocket\r\n"));
await writer.write(encode("Connection: Upgrade\r\n"));
await writer.write(
encode(`Sec-WebSocket-Accept: ${accept}\r\n\r\n`),
);
// Don't close WebSocket
// await writer.close();
} else {
let parsed = wsparser(value);
console.log(
{
value,
},
parsed.data.length,
parsed.realLens,
);
// Handle fragmented frames up to 65536 byteLength
// from input ArrayBuffer
if (parsed.data.length < parsed.realLens) {
this.fragments.push(parsed.data);
console.log(this.fragments);
return;
}
if (this.fragments.length) {
const data = [];
do {
for (let i = 0; i < this.fragments.length; i++) {
data.push(...this.fragments.shift());
}
} while (this.fragments.length);
parsed.data = new Uint8Array([...data, ...value]);
console.log({
parsed,
});
}
// Handle client closing WebSocket
if (parsed.opcode === 0x8 && parsed.data.length === 0) {
console.log(parsed.opcode, parsed.data);
await writer.write(new Uint8Array([0x88, 0x00]));
// 136, 0
await writer.close();
return await writer.closed;
}
// Write 16384 bytes
for (let i = 0; i < parsed.data.length; i += 16384) {
await writer.write(
new Uint8Array(createWebSocketFrame(parsed.data.subarray(i, i + 16384))),
);
}
// Don't close WebSocket
// await writer.close();
}
},
close() {
console.log("Client close");
},
}),
).catch(console.log);
}
const server = createServer({
highWaterMark: 0,
allowHalfOpen: true,
noDelay: true,
}, onSocketHandler);
server.on("error", console.log);
server.listen({
port: 8080,
host: "0.0.0.0",
signal,
}, () => {
const {
address,
family,
port,
} = server.address();
console.log(
`Listening on family: ${family}, address: ${address}, port: ${port}`,
);
});
/*
var wss = new WebSocketStream("ws://0.0.0.0:8080");
console.log(wss);
wss.closed.catch((e) => {
console.log(e)
});
var {
readable,
writable,
} = await wss.opened.catch(console.error);
var writer = writable.getWriter();
var abortable = new AbortController();
var {
signal,
} = abortable;
var len = 0;
var pipe = readable.pipeTo(new WritableStream({
start(c) {
console.log("Start", c);
},
async write(v) {
console.log(len += v.byteLength, v);
},
close() {
console.log("Socket closed");
},
abort(reason) {
console.log({
reason
});
},
}), {
signal,
}, ).then(() => console.log("Done")).catch((e) => console.log(e));
await writer.write(new ArrayBuffer(16384 * 3));
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment