Created
March 27, 2021 14:30
-
-
Save entone/2d9c39d401f91d755d077abdad0d056a to your computer and use it in GitHub Desktop.
Basic TCP/SSL Client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Broker.Client do | |
use GenServer | |
require Logger | |
alias Broker.{Message, Parser} | |
defmodule State do | |
defstruct [ | |
:socket, | |
:host, | |
:port, | |
:cert, | |
:message_handler, | |
:registry, | |
:broker_name, | |
connected: false | |
] | |
end | |
def send(broker, data) do | |
GenServer.call(broker, {:send, data}) | |
end | |
def start_link( | |
host: host, | |
port: port, | |
certificate: cert, | |
message_handler: handler, | |
registry: registry, | |
broker_name: broker_name, | |
name: name | |
) do | |
GenServer.start_link(__MODULE__, {host, port, cert, handler, registry, broker_name}, name: name) | |
end | |
def init({host, port, cert, handler, registry, broker_name}) do | |
{:ok, %State{ | |
host: host, | |
port: port, | |
cert: cert, | |
message_handler: handler, | |
registry: registry, | |
broker_name: broker_name | |
}, {:continue, :connect}} | |
end | |
def handle_continue(:connect, %State{host: host, port: port, cert: cert} = state) do | |
{:noreply, | |
case connect(host, port, cert) do | |
{:ok, socket} -> | |
%State{state | socket: socket, connected: true} | |
{:error, reason} -> | |
Logger.error("Error connecting to broker #{host}:#{port} - #{inspect(reason)}") | |
Process.send_after(self(), :connect, 500) | |
state | |
end | |
} | |
end | |
def handle_info(:connect, state) do | |
{:noreply, state, {:continue, :connect}} | |
end | |
def handle_info({:ssl, _s, data}, %State{socket: socket, message_handler: handler, registry: registry, broker_name: broker_name} = state) do | |
handle_packet(handler, registry, broker_name, data) | |
:ssl.setopts(socket, active: :once) | |
{:noreply, state} | |
end | |
def handle_info({:ssl_closed, _s}, state) do | |
Logger.error("Broker Connection Closed") | |
{:noreply, %State{state | connected: false}, {:continue, :connect}} | |
end | |
def handle_info({:ssl_error, _s, reason}, state) do | |
Logger.error("Broker Connection Error: #{inspect(reason)}") | |
{:noreply, state} | |
end | |
def handle_info(msg, state) do | |
Logger.warn("Unknown Message Type: #{inspect(msg)}") | |
{:noreply, state} | |
end | |
def handle_call({:send, data}, _from, %State{socket: socket, connected: true} = state) do | |
:ssl.send(socket, data) | |
{:reply, :ok, state} | |
end | |
def handle_call({:send, _data}, _from, %State{connected: false} = state) do | |
Logger.error("No connection to socket, cannot send") | |
{:reply, {:error, :disconnected}, state} | |
end | |
defp handle_packet(handler, registry, broker, data) do | |
data | |
|> Parser.parse() | |
|> Enum.each(fn msg -> | |
maybe_handle_message(msg, handler, registry, broker) | |
end) | |
end | |
defp maybe_handle_message(%Message{} = msg, handler, registry, broker) do | |
handler.handle_message(message: msg, registry: registry, broker: broker) | |
end | |
defp maybe_handle_message({:error, er}, _, _, _) do | |
Logger.error("Unable to parse packet: #{inspect(er)}") | |
end | |
defp connect(host, port, cert) do | |
:ssl.connect(host, port, active: :once, mode: :binary, cert: cert) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment