Forked from michaelklishin/automatic_recovery_of_queues_bindings_and_consumers_example.rb
Created
September 23, 2011 13:22
-
-
Save akm/1237311 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
require "rubygems" | |
require "amqp" # requires version >= 0.8.0.RC14 | |
puts "=> Example of automatic AMQP channel and queues recovery" | |
puts | |
AMQP.start(:host => "localhost") do |connection, open_ok| | |
puts "AMQP.start block runs" | |
# on_open, on_closedに渡されたブロックは、何度再接続をしても最初の一度だけしか呼び出されないが、 | |
# after_recovery(on_recovery)に渡されたブロックは、再接続の度に呼び出されます。 | |
connection.on_open do |*args| | |
puts "on_open: " << args.inspect | |
end | |
connection.on_closed do |*args| | |
puts "on_closed: " << args.inspect | |
end | |
connection.after_recovery do |session, settings| | |
puts "after_recovery: " << settings.inspect | |
end | |
connection.on_error do |conn, connection_close| | |
puts "connection.on_error channel_close.reply_text. channel_close: " << connection_close.inspect | |
# conn.reconnect(false, 2) | |
# puts "raising channel_close.reply_text. channel_close: " << connection_close.inspect | |
# raise connection_close.reply_text | |
end | |
connection.on_tcp_connection_loss do |conn, settings| | |
puts "[network failure] Trying to reconnect..." | |
conn.reconnect(false, 2) | |
end | |
ch1 = AMQP::Channel.new(connection, :auto_recovery => true) | |
# ch1.auto_recovery = true | |
ch1.on_error do |ch, channel_close| | |
puts "raising channel_close.reply_text. channel_close: " << channel_close.inspect | |
raise channel_close.reply_text | |
end | |
if ch1.auto_recovering? | |
puts "Channel #{ch1.id} IS auto-recovering" | |
end | |
ch1.queue("amqpgem.examples.queue1", :auto_delete => true).bind("amq.fanout") | |
ch1.queue("amqpgem.examples.queue2", :auto_delete => true).bind("amq.fanout") | |
ch1.queue("amqpgem.examples.queue3", :auto_delete => true).bind("amq.fanout").subscribe do |metadata, payload| | |
end | |
show_stopper = Proc.new { | |
connection.disconnect{ puts "Disconnected. Exiting…"; EventMachine.stop } | |
} | |
Signal.trap "TERM", show_stopper | |
Signal.trap "INT", show_stopper | |
EM.add_timer(30, show_stopper) | |
puts "Connected, authenticated. To really exercise this example, shut AMQP broker down for a few seconds. If you don't it will exit gracefully in 30 seconds." | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment