|
require 'socket' |
|
require 'logger' |
|
|
|
server_port = (ARGV.first || 4143).to_i |
|
|
|
server = TCPServer.new '0.0.0.0', server_port |
|
logger = Logger.new STDOUT |
|
|
|
logger.info "Listening on #{server_port}" |
|
|
|
database = {} |
|
nodes = {} |
|
|
|
EOL = /\r?\n\z/ |
|
|
|
loop do |
|
Thread.new server.accept do |connection| |
|
_, peer_port, _, peer_ip = connection.peeraddr |
|
client_info = "#{peer_ip}:#{peer_port}" |
|
logger.info "Accepted a connection from #{client_info}" |
|
|
|
connection.puts "EHLO" |
|
|
|
loop do |
|
begin |
|
message = connection.gets |
|
|
|
case message |
|
when /\AGET ([^ \n]+?)#{EOL}/ |
|
key = $1 |
|
|
|
connection.puts database[key] |
|
when /\ADEL ([^ \n]+?)#{EOL}/ |
|
key = $1 |
|
|
|
database.delete key |
|
|
|
connection.puts "OK DELETED #{key}" |
|
when /\APUT ([^ \n]+?) ([^\n]+?)#{EOL}/ |
|
key, value = $1, $2 |
|
logger.info [key, value].inspect |
|
|
|
database[key] = value |
|
connection.puts "OK #{key}" |
|
when /\ALIST#{EOL}/ |
|
connection.puts database.size |
|
connection.puts database.keys |
|
when /\AQUIT#{EOL}/ |
|
connection.puts 'OK BYE' |
|
connection.close |
|
logger.info "Disconnecting #{client_info}" |
|
break |
|
when /\AADD_NODE ([^ ]+) (\d+)#{EOL}/ |
|
ip, port = $2, $3 |
|
new_node = [ip, port] |
|
new_node_info = "#{ip}:#{port}" |
|
|
|
if nodes[new_node] |
|
logger.info "Ignoring known node #{new_node_info}" |
|
connection.puts "NODE EXISTS #{new_node_info}" |
|
next |
|
end |
|
|
|
nodes[new_node] = TCPSocket.new ip, port |
|
|
|
logger.info "Added node #{new_node_info}" |
|
connection.puts "ADDED NODE #{new_node_info}" |
|
|
|
# Push our DB to all nodes |
|
database.each do |key, value| |
|
nodes[new_node].puts "PUT #{key} #{value}" |
|
end |
|
|
|
# Announce the new node to all other nodes |
|
nodes.each do |node, node_connection| |
|
next if node == new_node |
|
|
|
ip, port = node |
|
logger.info "Announcing #{new_node_info} to #{ip}:#{port}" |
|
node_connection.puts "ADD_NODE #{ip} #{port}" |
|
end |
|
when /\ADEL_NODE ([^ ]+) (\d+)#{EOL}/ |
|
ip, port = $1, $2 |
|
new_node = [ip, port] |
|
new_node_info = "#{ip}:#{port}" |
|
|
|
unless nodes[new_node] |
|
logger.info "Ignoring unknown node #{new_node_info}" |
|
connection.puts "UNKNOWN NODE #{new_node_info}" |
|
next |
|
end |
|
|
|
logger.info "Deleted node #{new_node_info}" |
|
connection.puts "DELETED NODE #{new_node_info}" |
|
|
|
# Announce the removal to all other nodes |
|
nodes.each do |node, node_connection| |
|
ip, port = node |
|
logger.info "Announcing node #{new_node_info} removal to #{ip}:#{port}" |
|
node_connection.puts "DEL_NODE #{ip} #{port}" |
|
end |
|
when /\ALIST_NODES#{EOL}/ |
|
connection.puts nodes.size |
|
connection.puts nodes.keys.map { |ip, port| "#{ip}:#{port}" } |
|
else |
|
connection.puts "UNKNOWN MESSAGE: #{message}" |
|
logger.warn "Unsupported message received from #{client_info}: #{message}" |
|
end |
|
rescue Errno::EPIPE => error |
|
logger.info "Client #{client_info} disconnected (#{error.class}: #{error.message})" |
|
connection.close |
|
break |
|
rescue Exception => e |
|
logger.error e |
|
connection.close |
|
break |
|
end |
|
end |
|
end |
|
end |