Last active
March 8, 2023 20:36
-
-
Save barisbll/3b22161674b25a561f3b4e4904ad3672 to your computer and use it in GitHub Desktop.
Typescript RPC with Observables Written By ChatGPT
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as amqp from 'amqplib'; | |
import { Observable, Observer } from 'rxjs'; | |
interface RpcRequest { | |
correlationId: string; | |
resolve: (response: any) => void; | |
reject: (error: Error) => void; | |
} | |
class RabbitMQRpcClient { | |
private connection: amqp.Connection; | |
private channel: amqp.Channel; | |
private readonly exchangeName: string; | |
private readonly requestQueueName: string; | |
private readonly responseQueueName: string; | |
private readonly rpcRequests: Record<string, RpcRequest> = {}; | |
constructor(private readonly serverUrls: string[]) { | |
this.exchangeName = 'rpc_exchange'; | |
this.requestQueueName = 'rpc_request_queue'; | |
this.responseQueueName = 'rpc_response_queue'; | |
this.initialize(); | |
} | |
private async initialize() { | |
this.connection = await amqp.connect(this.serverUrls); | |
this.channel = await this.connection.createChannel(); | |
// Create the exchange and the queues | |
await this.channel.assertExchange(this.exchangeName, 'direct', { durable: false }); | |
await this.channel.assertQueue(this.requestQueueName, { exclusive: true }); | |
await this.channel.assertQueue(this.responseQueueName, { exclusive: true }); | |
// Bind the queues to the exchange | |
await this.channel.bindQueue(this.requestQueueName, this.exchangeName, this.requestQueueName); | |
await this.channel.bindQueue(this.responseQueueName, this.exchangeName, this.responseQueueName); | |
// Consume messages from the response queue | |
await this.channel.consume(this.responseQueueName, (message) => { | |
const correlationId = message.properties.correlationId; | |
const rpcRequest = this.rpcRequests[correlationId]; | |
if (rpcRequest) { | |
const response = JSON.parse(message.content.toString()); | |
rpcRequest.resolve(response); | |
delete this.rpcRequests[correlationId]; | |
} | |
}, { noAck: true }); | |
} | |
public callRpcServer<TRequest, TResponse>(serviceName: string, methodName: string, request: TRequest): Observable<TResponse> { | |
return new Observable<TResponse>((observer: Observer<TResponse>) => { | |
const correlationId = this.generateUuid(); | |
const responseQueue = this.responseQueueName; | |
const rpcRequest = { correlationId, resolve: observer.next.bind(observer), reject: observer.error.bind(observer) }; | |
this.rpcRequests[correlationId] = rpcRequest; | |
// Send the request to the request queue | |
const requestData = { serviceName, methodName, request }; | |
const requestDataJson = JSON.stringify(requestData); | |
this.channel.publish(this.exchangeName, this.requestQueueName, Buffer.from(requestDataJson), { correlationId, replyTo: responseQueue }); | |
}); | |
} | |
private generateUuid(): string { | |
return Math.random().toString() + | |
Math.random().toString() + | |
Math.random().toString(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const rpcClient = new RabbitMQRpcClient(['amqp://localhost']); | |
rpcClient.callRpcServer<number, number>('calculator', 'add', { x: 2, y: 3 }).subscribe((result) => { | |
console.log(`Result: ${result}`); | |
}, (error) => { | |
console.error(`Error: ${error}`); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment