Last active
August 22, 2025 05:57
-
-
Save electricessence/7521d414091d45d9158d10ff0541990d to your computer and use it in GitHub Desktop.
Universal Web-Socket
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// universal-websocket-base.ts | |
// Minimal WebSocket base with state-aware, timeoutable async connect() | |
// Built on @tsdotnet/disposable's AsyncDisposableBase. | |
// | |
// Deps: | |
// npm i rxjs @tsdotnet/disposable | |
// | |
// Implement in a subclass (Browser/Node/K6): | |
// protected _open(url: string, opts?: OpenOptions): void | |
// protected _send(data: WsMessage): void | |
// protected _close(code?: number, reason?: string): void | |
// | |
// Optional (if your transport needs async shutdown): | |
// protected override async _disposeTransportAsync(): Promise<void> { ... } | |
import { | |
BehaviorSubject, | |
Subject, | |
Observable, | |
firstValueFrom, | |
filter, | |
timeout as rxTimeout, | |
shareReplay, | |
} from 'rxjs'; | |
import { AsyncDisposableBase } from '@tsdotnet/disposable'; | |
export type WsMessage = string | ArrayBuffer | Uint8Array | Buffer; | |
export type ConnectionState = 'idle' | 'connecting' | 'open' | 'closing' | 'closed'; | |
export interface OpenOptions { | |
protocols?: string | string[]; | |
headers?: Record<string, string>; | |
} | |
export abstract class UniversalWebSocketBase extends AsyncDisposableBase { | |
// Internal subjects | |
protected readonly _state$ = new BehaviorSubject<ConnectionState>('idle'); | |
protected readonly _messages$ = new Subject<WsMessage>(); | |
protected readonly _errors$ = new Subject<unknown>(); | |
protected readonly _closes$ = new Subject<{ code?: number; reason?: string }>(); | |
// Stable public observables | |
public readonly state$: Observable<ConnectionState>; | |
public readonly message$: Observable<WsMessage>; | |
public readonly error$: Observable<unknown>; | |
public readonly close$: Observable<{ code?: number; reason?: string }>; | |
// Last requested connection (available to subclasses if needed) | |
protected targetUrl = ''; | |
protected lastOpenOpts?: OpenOptions; | |
// Coalesce concurrent connect() calls | |
private _pendingConnect?: Promise<void>; | |
constructor() { | |
super(); | |
this.state$ = this._state$.asObservable().pipe(shareReplay({ bufferSize: 1, refCount: true })); | |
this.message$ = this._messages$.asObservable(); | |
this.error$ = this._errors$.asObservable(); | |
this.close$ = this._closes$.asObservable(); | |
} | |
/** | |
* State-aware, idempotent, timeoutable connect. | |
* - if OPEN -> resolves immediately | |
* - if CONNECTING -> returns same in-flight promise | |
* - if CLOSING -> waits for CLOSED, then starts fresh connect | |
* - else -> starts connect | |
*/ | |
public async connect(url: string, openOpts?: OpenOptions, timeoutMs = 10_000): Promise<void> { | |
this._throwIfDisposed(); | |
// Fast-path already open | |
if (this._state$.value === 'open') { | |
this.targetUrl = url; | |
this.lastOpenOpts = openOpts; | |
return; | |
} | |
// If closing, wait for closed before attempting | |
if (this._state$.value === 'closing') { | |
await firstValueFrom(this.state$.pipe(filter(s => s === 'closed'))); | |
} | |
// If already connecting, coalesce | |
if (this._state$.value === 'connecting' && this._pendingConnect) { | |
return this._pendingConnect; | |
} | |
// Fresh attempt | |
this.targetUrl = url; | |
this.lastOpenOpts = openOpts; | |
this._state$.next('connecting'); | |
try { | |
this._open(url, openOpts); | |
} catch (e) { | |
this._errors$.next(e); | |
this._state$.next('closed'); | |
throw e; | |
} | |
const attempt = (async () => { | |
const terminal = await firstValueFrom( | |
this.state$.pipe( | |
filter(s => s === 'open' || s === 'closed'), | |
rxTimeout({ each: timeoutMs }) | |
) | |
); | |
if (terminal !== 'open') throw new Error('Socket closed before opening'); | |
})(); | |
this._pendingConnect = attempt.finally(() => { | |
if (this._pendingConnect === attempt) this._pendingConnect = undefined; | |
}); | |
return this._pendingConnect; | |
} | |
/** Send a data frame (throws if underlying transport rejects). */ | |
public send(data: WsMessage): void { | |
this.assertIsAlive(); | |
this._send(data); | |
} | |
/** Request a graceful close. */ | |
public close(code?: number, reason?: string): void { | |
if (this.disposeState>0) return; | |
this._state$.next('closing'); | |
try { this._close(code, reason); } | |
catch (e) { this._emitError(e); } | |
} | |
// ---- Abstract hooks for concrete transports ---- | |
protected abstract _open(url: string, opts?: OpenOptions): void; | |
protected abstract _send(data: WsMessage): void; | |
protected abstract _close(code?: number, reason?: string): void; | |
// ---- Emitters for transports to call on events ---- | |
/** Call when underlying transport signals 'open'. */ | |
protected _emitOpen(): void { this._state$.next('open'); } | |
/** Call for each received message. */ | |
protected _emitMessage(msg: WsMessage): void { this._messages$.next(msg); } | |
/** Call on error (non-terminal). */ | |
protected _emitError(err: unknown): void { this._errors$.next(err); } | |
/** Call when underlying transport closes. */ | |
protected _emitClose(evt?: { code?: number; reason?: string }): void { | |
this._state$.next('closed'); | |
this._closes$.next(evt ?? {}); | |
} | |
// ---- Async disposal ---- | |
// NOTE: Assuming AsyncDisposableBase calls protected async _onDisposeAsync(). | |
// If your version uses a different hook name, rename accordingly. | |
protected override async _onDisposeAsync(): Promise<void> { | |
try { | |
await this._disposeTransportAsync(); | |
} catch { /* swallow during dispose */ } | |
// Complete subjects after transport teardown | |
this._messages$.complete(); | |
this._errors$.complete(); | |
this._closes$.complete(); | |
this._state$.complete(); | |
// Drop any pending connect promise (callers should handle rejection/await order) | |
this._pendingConnect = undefined; | |
} | |
/** | |
* Override if the transport needs asynchronous shutdown. | |
* Default: best-effort sync close. | |
*/ | |
// eslint-disable-next-line @typescript-eslint/no-unused-vars | |
protected async _disposeTransportAsync(): Promise<void> { | |
try { this._close(1000, 'dispose'); } catch {} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment