Skip to content

Instantly share code, notes, and snippets.

@deepj
Created April 15, 2020 18:50

Revisions

  1. deepj created this gist Apr 15, 2020.
    50 changes: 50 additions & 0 deletions nats-client.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,50 @@
    # frozen_string_literal: true

    require 'async/io'
    require 'async/io/stream'
    require 'async/pool/controller'
    require 'async/await'

    module Async
    module NATS
    def self.local_endpoint
    Async::IO::Endpoint.tcp('localhost', 4222)
    end

    class Client
    include Async::Await

    def initialize(endpoint = NATS.local_endpoint, **options)
    @endpoint = endpoint

    @pool = connect(**options)
    end

    def ping
    call("PING")
    end

    def call(*arguments)
    @pool.acquire do |connection|
    connection.write_lines(arguments)
    connection.flush
    return connection.read_response
    end
    end

    private

    def connect(**options)
    Async::Pool::Controller.wrap(**options) do
    peer = @endpoint.connect
    peer.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

    stream = IO::Stream.new(peer)
    @protocol = Async::IO::Protocol::Line.new(stream, CLRF)
    end
    end
    end
    end
    end