Created
April 26, 2025 05:59
-
-
Save guest271314/87b689edc19646d18975cb74126d67d0 to your computer and use it in GitHub Desktop.
WebSocket server for Node.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Handles 16384*3 byte length input/output | |
import { createServer } from "node:net"; | |
import { Duplex } from "node:stream"; | |
// https://github.com/lsert/websocketparser | |
function wsparser(d) { | |
const tArr = d; | |
const group0 = tArr[0]; | |
const FIN = group0 >> 7; | |
const RSV1 = (group0 & 64) >> 6; | |
const RSV2 = (group0 & 32) >> 5; | |
const RSV3 = (group0 & 16) >> 4; | |
const opcode = group0 & 15; | |
const group1 = tArr[1]; | |
const group2 = tArr[2]; | |
const group3 = tArr[3]; | |
// 计算数据开始点 | |
let counter = 2; | |
const mask = group1 >> 7; | |
const payloadLens = group1 & 127; | |
let realLens = payloadLens; | |
if (payloadLens === 126) { | |
realLens = (group2 << 8) | group3; | |
counter += 2; | |
} else if (payloadLens === 127) { | |
const ab = new ArrayBuffer(8); | |
const dv = new DataView(ab); | |
for (let i = 0; i < 8; i++) { | |
dv.setUint8(i, tArr[i + 2]); | |
} | |
realLens = dv.getBigUint64(0, false); | |
counter += 8; | |
} | |
let data; | |
if (mask === 1) { | |
const maskKeyArr = tArr.slice(counter, counter + 4); | |
counter += 4; | |
// 解掩码 | |
data = tArr.slice(counter).map((item, index) => { | |
const j = index % 4; | |
return item ^ maskKeyArr[j]; | |
}); | |
} else { | |
data = tArr.slice(counter); | |
} | |
let arr = { | |
FIN, | |
mask, | |
RSV1, | |
RSV2, | |
RSV3, | |
opcode, | |
realLens, | |
data: new Uint8Array(data), | |
}; | |
return arr; | |
} | |
// https://gist.github.com/robertrypula/b813ffe23a9489bae1b677f1608676c8 | |
const createWebSocketFrame = (payload) => { | |
const payloadLengthByteCount = payload.length < 126 ? 0 : 2; | |
const buffer = new ArrayBuffer(2 + payloadLengthByteCount + payload.length); | |
const view = new DataView(buffer); | |
let payloadOffset = 2; | |
console.log(payload.length, payloadLengthByteCount); | |
// TODO: Handle greater than 65536 input ArrayBuffer byteLength (guest271314) | |
if (payload.length >= Math.pow(2, 16)) { | |
throw new Error("Payload equal or bigger than 64 KiB is not supported"); | |
} | |
view.setUint8(0, 0b10000010); | |
view.setUint8(1, payload.length < 126 ? payload.length : 126); | |
if (payloadLengthByteCount > 0) { | |
view.setUint16(2, payload.length, false); | |
payloadOffset += payloadLengthByteCount; | |
} | |
for (let i = 0, j = payloadOffset; i < payload.length; i++, j++) { | |
view.setUint8(j, payload[i]); | |
} | |
return buffer; | |
}; | |
// Handle WebSocket handshake | |
// https://stackoverflow.com/a/77398427 | |
async function digest(message, algo = "SHA-1") { | |
return btoa( | |
[ | |
...new Uint8Array( | |
await crypto.subtle.digest( | |
algo, | |
new TextEncoder().encode( | |
`${message}258EAFA5-E914-47DA-95CA-C5AB0DC85B11`, | |
), | |
), | |
), | |
].map((s) => String.fromCodePoint(s)).join(""), | |
); | |
} | |
const abortable = new AbortController(); | |
const encoder = new TextEncoder(); | |
const decoder = new TextDecoder(); | |
const encode = (text) => encoder.encode(text); | |
const { | |
signal, | |
} = abortable; | |
async function onSocketHandler(socket) { | |
const { readable, writable } = Duplex.toWeb(socket); | |
const writer = writable.getWriter(); | |
readable.pipeTo( | |
new WritableStream({ | |
start: () => { | |
this.fragments = []; | |
}, | |
write: async (value) => { | |
const request = decoder.decode(value); | |
console.log({ request }); | |
if (/Connection: Upgrade/i.test(request)) { | |
const headers = new Headers( | |
request.match(/.+/g).map((line) => line.split(/:\s|\s\/\s/)), | |
); | |
const key = headers.get("sec-websocket-key"); | |
const accept = await digest(key); | |
await writer.write( | |
encode("HTTP/1.1 101 Switching Protocols\r\n"), | |
); | |
await writer.write(encode("Upgrade: websocket\r\n")); | |
await writer.write(encode("Connection: Upgrade\r\n")); | |
await writer.write( | |
encode(`Sec-WebSocket-Accept: ${accept}\r\n\r\n`), | |
); | |
// Don't close WebSocket | |
// await writer.close(); | |
} else { | |
let parsed = wsparser(value); | |
console.log( | |
{ | |
value, | |
}, | |
parsed.data.length, | |
parsed.realLens, | |
); | |
// Handle fragmented frames up to 65536 byteLength | |
// from input ArrayBuffer | |
if (parsed.data.length < parsed.realLens) { | |
this.fragments.push(parsed.data); | |
console.log(this.fragments); | |
return; | |
} | |
if (this.fragments.length) { | |
const data = []; | |
do { | |
for (let i = 0; i < this.fragments.length; i++) { | |
data.push(...this.fragments.shift()); | |
} | |
} while (this.fragments.length); | |
parsed.data = new Uint8Array([...data, ...value]); | |
console.log({ | |
parsed, | |
}); | |
} | |
// Handle client closing WebSocket | |
if (parsed.opcode === 0x8 && parsed.data.length === 0) { | |
console.log(parsed.opcode, parsed.data); | |
await writer.write(new Uint8Array([0x88, 0x00])); | |
// 136, 0 | |
await writer.close(); | |
return await writer.closed; | |
} | |
// Write 16384 bytes | |
for (let i = 0; i < parsed.data.length; i += 16384) { | |
await writer.write( | |
new Uint8Array(createWebSocketFrame(parsed.data.subarray(i, i + 16384))), | |
); | |
} | |
// Don't close WebSocket | |
// await writer.close(); | |
} | |
}, | |
close() { | |
console.log("Client close"); | |
}, | |
}), | |
).catch(console.log); | |
} | |
const server = createServer({ | |
highWaterMark: 0, | |
allowHalfOpen: true, | |
noDelay: true, | |
}, onSocketHandler); | |
server.on("error", console.log); | |
server.listen({ | |
port: 8080, | |
host: "0.0.0.0", | |
signal, | |
}, () => { | |
const { | |
address, | |
family, | |
port, | |
} = server.address(); | |
console.log( | |
`Listening on family: ${family}, address: ${address}, port: ${port}`, | |
); | |
}); | |
/* | |
var wss = new WebSocketStream("ws://0.0.0.0:8080"); | |
console.log(wss); | |
wss.closed.catch((e) => { | |
console.log(e) | |
}); | |
var { | |
readable, | |
writable, | |
} = await wss.opened.catch(console.error); | |
var writer = writable.getWriter(); | |
var abortable = new AbortController(); | |
var { | |
signal, | |
} = abortable; | |
var len = 0; | |
var pipe = readable.pipeTo(new WritableStream({ | |
start(c) { | |
console.log("Start", c); | |
}, | |
async write(v) { | |
console.log(len += v.byteLength, v); | |
}, | |
close() { | |
console.log("Socket closed"); | |
}, | |
abort(reason) { | |
console.log({ | |
reason | |
}); | |
}, | |
}), { | |
signal, | |
}, ).then(() => console.log("Done")).catch((e) => console.log(e)); | |
await writer.write(new ArrayBuffer(16384 * 3)); | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment