Created
November 21, 2013 00:06
-
-
Save soupmatt/7573582 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BaseDaemon | |
attr_reader :logger | |
def initialize | |
@stopping = false | |
@logger = ::Logger.new(STDOUT) | |
@ending_thread = Thread.new do | |
Thread.stop | |
logger.info "Received a signal in #{name}" | |
stop | |
Thread.main.wakeup | |
end | |
ending_proc = proc do | |
@ending_thread.wakeup | |
end | |
trap "TERM", &ending_proc | |
trap "INT", &ending_proc | |
trap "USR2" do | |
Thread.new do | |
threads = Thread.list.reject{ |t| t == Thread.current } | |
puts "here are the other #{threads.size} threads" | |
threads.each do |t| | |
name = case t | |
when Thread.main | |
"Main #{t.inspect}" | |
when @ending_thread | |
"Ending #{t.inspect}" | |
else | |
t.inspect | |
end | |
puts ([name] + t.backtrace).join("\n ") | |
end | |
end | |
end | |
end | |
def join | |
@ending_thread.join | |
end | |
def name | |
self.class.name | |
end | |
def stop | |
@stopping = true | |
end | |
def stopping? | |
@stopping | |
end | |
def run | |
raise "must be implemented in subclass" | |
end | |
def with_error_rescue | |
begin | |
yield | |
rescue => e | |
report_error(e, "Error in run loop for #{name}") | |
end | |
end | |
def report_error(e, message) | |
log_error(e, message) | |
end | |
def log_error(e, message) | |
logger.error exception_to_pretty_string(e, message) | |
end | |
def exception_to_pretty_string(error, message=nil) | |
lines = [] | |
msg = "" | |
if message | |
msg = "#{message}\n" | |
end | |
lines << "#{msg}#{error.class.name}: #{error.message}" | |
lines += error.backtrace | |
lines.join(" \n") | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'base_daemon' | |
class MessageConsumerDaemon < BaseDaemon | |
attr_reader :config, :session | |
def initialize(config, use_close_with_cleanup=true) | |
super() | |
@config = config | |
@use_close_with_cleanup = use_close_with_cleanup | |
create_bunny_connection | |
end | |
def run | |
retry_delay = 1 | |
begin | |
with_error_rescue do | |
begin | |
bunny_restart if retry_delay > 1 | |
logger.info "bunny connected!" | |
create_subscriptions | |
retry_delay = 1 | |
join | |
rescue => e | |
report_error(e, "Error for #{name}. Attempting to reconnect to RabbitMQ in #{retry_delay} seconds.") | |
with_error_rescue do | |
bunny_restart | |
end | |
sleep retry_delay | |
if retry_delay >= 30 | |
retry_delay = 60 | |
else | |
retry_delay *= 2 | |
end | |
logger.info "attempting to reconnect..." | |
end | |
end | |
end until stopping? | |
end | |
def create_bunny_connection | |
@session = Bunny.new(config) | |
@session.start | |
end | |
def bunny_restart | |
if @use_close_with_cleanup | |
session.close_with_cleanup | |
else | |
session.close | |
end | |
create_bunny_connection | |
end | |
def stop | |
super | |
if @use_close_with_cleanup | |
session.close_with_cleanup | |
else | |
session.close | |
end | |
end | |
def create_subscriptions | |
3.times do |i| | |
ch = session.create_channel | |
ch.queue("queue1").subscribe do |_, _, payload| | |
puts "queue1 consumer #{i} got a message with payload #{payload}" | |
end | |
end | |
2.times do |i| | |
ch = session.create_channel | |
ch.queue("queue2").subscribe do |_, _, payload| | |
puts "queue2 consumer #{i} got a message with payload #{payload}" | |
end | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
require_relative 'session_patch' | |
require_relative 'message_consumer_daemon' | |
bunny_config_hash = { | |
vhost: "bunny_testbed", | |
automatically_recover: false | |
} | |
mcd = MessageConsumerDaemon.new(bunny_config_hash) | |
mcd.run |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'bunny' | |
module Bunny | |
class Session | |
def close_with_cleanup | |
log_errors { self.close } | |
@channels.each do |_, channel| | |
log_errors { channel.maybe_kill_consumer_work_pool! } | |
end | |
log_errors { self.maybe_shutdown_heartbeat_sender } | |
log_errors { self.maybe_shutdown_reader_loop } | |
log_errors { self.close_transport } | |
end | |
def log_errors | |
begin | |
yield | |
rescue => e | |
@logger.info "#{e.class}: #{e.to_s}" | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment