Last active
March 11, 2024 17:40
-
-
Save douglascayers/3ff755f817440462c225bef1882684f0 to your computer and use it in GitHub Desktop.
PubSubService with RXJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Subject, filter } from 'rxjs'; | |
import { PubSubMessage, PubSubService, PubSubTopic } from '../pubsub.types'; | |
export class PubSubServiceMock implements PubSubService { | |
public readonly subject$ = new Subject<PubSubMessage<PubSubTopic>>(); | |
public readonly publishSpy = jest.fn(); | |
public readonly streamSpy = jest.fn(); | |
public publish<T extends PubSubTopic>(message: PubSubMessage<T>): void { | |
this.publishSpy(message); | |
this.subject$.next(message); | |
} | |
public stream<T extends PubSubTopic>(topic: T): Observable<PubSubMessage<T>> { | |
this.streamSpy(topic); | |
const filtered$ = this.subject$.pipe( | |
filter((message) => { | |
return message.topic === topic; | |
}) | |
); | |
return filtered$ as Observable<PubSubMessage<T>>; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Injectable, OnApplicationShutdown } from '@nestjs/common'; | |
import { Observable, Subject, filter } from 'rxjs'; | |
import { PubSubMessage, PubSubService, PubSubTopic } from './pubsub.types'; | |
/** | |
* A service for publishing messages within the application. | |
* Not to be confused with AWS IoT or Graphql PubSubEngine. | |
*/ | |
@Injectable() | |
class PubSubServiceImpl implements PubSubService, OnApplicationShutdown { | |
private subject$: Subject<PubSubMessage<PubSubTopic>>; | |
constructor() { | |
this.subject$ = new Subject<PubSubMessage<PubSubTopic>>(); | |
} | |
public onApplicationShutdown(): void { | |
this.subject$.complete(); | |
} | |
/** | |
* Publish a message to a specific topic. | |
*/ | |
public publish<T extends PubSubTopic>(message: PubSubMessage<T>): void { | |
this.subject$.next(message); | |
} | |
/** | |
* Get an observable for a specific topic. | |
* Subscribe to the observable to begin receiving messages. | |
* Only messages published after you've subscribed will be received. | |
*/ | |
public stream<T extends PubSubTopic>(topic: T): Observable<PubSubMessage<T>> { | |
const filtered$ = this.subject$.pipe( | |
filter((message) => { | |
return message.topic === topic; | |
}) | |
); | |
return filtered$ as Observable<PubSubMessage<T>>; | |
} | |
} | |
export { PubSubServiceImpl }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from 'rxjs'; | |
// -------------------------------------------------------------------------- | |
// Topics | |
// -------------------------------------------------------------------------- | |
export enum PubSubTopic { | |
TopicA = 'topic_a', | |
TopicB = 'topic_b', | |
TopicC = 'topic_c', | |
} | |
// -------------------------------------------------------------------------- | |
// Abstract Messages | |
// -------------------------------------------------------------------------- | |
/** | |
* Defines the base structure of a pubsub message. | |
* Not really designed to be used directly. | |
* Prefer using `PubSubMessage<T>` or the types that extend from this one. | |
*/ | |
export interface PubSubBaseMessage<T extends PubSubTopic, P = unknown> { | |
topic: T; | |
payload: P; | |
} | |
/** | |
* Infers the concrete message type based on the topic you're using. | |
* As new topics and messages are created, wire them up here. | |
*/ | |
export type PubSubMessage<T extends PubSubTopic> = T extends PubSubTopic.TopicA | |
? PubSubTopicAMessage | |
: T extends PubSubTopic.TopicB | |
? PubSubTopicBMessage | |
: T extends PubSubTopic.TopicC | |
? PubSubTopicCMessage | |
: PubSubBaseMessage<T>; | |
// -------------------------------------------------------------------------- | |
// Messages | |
// -------------------------------------------------------------------------- | |
export interface PubSubTopicAMessage | |
extends PubSubBaseMessage<PubSubTopic.TopicA, PubSubTopicAPayload> {} | |
export interface PubSubTopicAPayload { | |
// ... | |
} | |
// -------------------------------------------------------------------------- | |
export interface PubSubTopicBMessage | |
extends PubSubBaseMessage<PubSubTopic.TopicB, PubSubTopicBPayload> {} | |
export interface PubSubTopicBPayload { | |
// ... | |
} | |
// -------------------------------------------------------------------------- | |
export interface PubSubTopicCMessage | |
extends PubSubBaseMessage<PubSubTopic.TopicC, PubSubTopicCPayload> {} | |
export interface PubSubTopicCPayload { | |
// ... | |
} | |
// -------------------------------------------------------------------------- | |
// Services | |
// -------------------------------------------------------------------------- | |
/** | |
* A service for publishing messages within the application. | |
* Not to be confused with AWS IoT or Graphql PubSubEngine. | |
* | |
* The service's main purpose is to make it easier to pubsub data | |
* while staying decoupled from the publisher and subscriber. | |
*/ | |
export interface PubSubService { | |
/** | |
* Publish a message to a specific topic. | |
*/ | |
publish<T extends PubSubTopic>(message: PubSubMessage<T>): void; | |
/** | |
* Get an observable for a specific topic. | |
* Subscribe to the observable to begin receiving messages. | |
* Only messages published after you've subscribed will be received. | |
*/ | |
stream<T extends PubSubTopic>(topic: T): Observable<PubSubMessage<T>>; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment