Skip to content

Instantly share code, notes, and snippets.

@electricessence
Last active August 22, 2025 05:57
Show Gist options
  • Save electricessence/7521d414091d45d9158d10ff0541990d to your computer and use it in GitHub Desktop.
Save electricessence/7521d414091d45d9158d10ff0541990d to your computer and use it in GitHub Desktop.
Universal Web-Socket
// universal-websocket-base.ts
// Minimal WebSocket base with state-aware, timeoutable async connect()
// Built on @tsdotnet/disposable's AsyncDisposableBase.
//
// Deps:
// npm i rxjs @tsdotnet/disposable
//
// Implement in a subclass (Browser/Node/K6):
// protected _open(url: string, opts?: OpenOptions): void
// protected _send(data: WsMessage): void
// protected _close(code?: number, reason?: string): void
//
// Optional (if your transport needs async shutdown):
// protected override async _disposeTransportAsync(): Promise<void> { ... }
import {
BehaviorSubject,
Subject,
Observable,
firstValueFrom,
filter,
timeout as rxTimeout,
shareReplay,
} from 'rxjs';
import { AsyncDisposableBase } from '@tsdotnet/disposable';
export type WsMessage = string | ArrayBuffer | Uint8Array | Buffer;
export type ConnectionState = 'idle' | 'connecting' | 'open' | 'closing' | 'closed';
export interface OpenOptions {
protocols?: string | string[];
headers?: Record<string, string>;
}
export abstract class UniversalWebSocketBase extends AsyncDisposableBase {
// Internal subjects
protected readonly _state$ = new BehaviorSubject<ConnectionState>('idle');
protected readonly _messages$ = new Subject<WsMessage>();
protected readonly _errors$ = new Subject<unknown>();
protected readonly _closes$ = new Subject<{ code?: number; reason?: string }>();
// Stable public observables
public readonly state$: Observable<ConnectionState>;
public readonly message$: Observable<WsMessage>;
public readonly error$: Observable<unknown>;
public readonly close$: Observable<{ code?: number; reason?: string }>;
// Last requested connection (available to subclasses if needed)
protected targetUrl = '';
protected lastOpenOpts?: OpenOptions;
// Coalesce concurrent connect() calls
private _pendingConnect?: Promise<void>;
constructor() {
super();
this.state$ = this._state$.asObservable().pipe(shareReplay({ bufferSize: 1, refCount: true }));
this.message$ = this._messages$.asObservable();
this.error$ = this._errors$.asObservable();
this.close$ = this._closes$.asObservable();
}
/**
* State-aware, idempotent, timeoutable connect.
* - if OPEN -> resolves immediately
* - if CONNECTING -> returns same in-flight promise
* - if CLOSING -> waits for CLOSED, then starts fresh connect
* - else -> starts connect
*/
public async connect(url: string, openOpts?: OpenOptions, timeoutMs = 10_000): Promise<void> {
this._throwIfDisposed();
// Fast-path already open
if (this._state$.value === 'open') {
this.targetUrl = url;
this.lastOpenOpts = openOpts;
return;
}
// If closing, wait for closed before attempting
if (this._state$.value === 'closing') {
await firstValueFrom(this.state$.pipe(filter(s => s === 'closed')));
}
// If already connecting, coalesce
if (this._state$.value === 'connecting' && this._pendingConnect) {
return this._pendingConnect;
}
// Fresh attempt
this.targetUrl = url;
this.lastOpenOpts = openOpts;
this._state$.next('connecting');
try {
this._open(url, openOpts);
} catch (e) {
this._errors$.next(e);
this._state$.next('closed');
throw e;
}
const attempt = (async () => {
const terminal = await firstValueFrom(
this.state$.pipe(
filter(s => s === 'open' || s === 'closed'),
rxTimeout({ each: timeoutMs })
)
);
if (terminal !== 'open') throw new Error('Socket closed before opening');
})();
this._pendingConnect = attempt.finally(() => {
if (this._pendingConnect === attempt) this._pendingConnect = undefined;
});
return this._pendingConnect;
}
/** Send a data frame (throws if underlying transport rejects). */
public send(data: WsMessage): void {
this.assertIsAlive();
this._send(data);
}
/** Request a graceful close. */
public close(code?: number, reason?: string): void {
if (this.disposeState>0) return;
this._state$.next('closing');
try { this._close(code, reason); }
catch (e) { this._emitError(e); }
}
// ---- Abstract hooks for concrete transports ----
protected abstract _open(url: string, opts?: OpenOptions): void;
protected abstract _send(data: WsMessage): void;
protected abstract _close(code?: number, reason?: string): void;
// ---- Emitters for transports to call on events ----
/** Call when underlying transport signals 'open'. */
protected _emitOpen(): void { this._state$.next('open'); }
/** Call for each received message. */
protected _emitMessage(msg: WsMessage): void { this._messages$.next(msg); }
/** Call on error (non-terminal). */
protected _emitError(err: unknown): void { this._errors$.next(err); }
/** Call when underlying transport closes. */
protected _emitClose(evt?: { code?: number; reason?: string }): void {
this._state$.next('closed');
this._closes$.next(evt ?? {});
}
// ---- Async disposal ----
// NOTE: Assuming AsyncDisposableBase calls protected async _onDisposeAsync().
// If your version uses a different hook name, rename accordingly.
protected override async _onDisposeAsync(): Promise<void> {
try {
await this._disposeTransportAsync();
} catch { /* swallow during dispose */ }
// Complete subjects after transport teardown
this._messages$.complete();
this._errors$.complete();
this._closes$.complete();
this._state$.complete();
// Drop any pending connect promise (callers should handle rejection/await order)
this._pendingConnect = undefined;
}
/**
* Override if the transport needs asynchronous shutdown.
* Default: best-effort sync close.
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
protected async _disposeTransportAsync(): Promise<void> {
try { this._close(1000, 'dispose'); } catch {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment