Created
October 11, 2012 08:55
-
-
Save ulfurinn/3871093 to your computer and use it in GitHub Desktop.
RPC with AMQP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "amqp" | |
require "fiber" | |
module AMQP_RPC | |
module Client | |
def subscribe | |
response_queue.subscribe do |header, body| | |
id = header.correlation_id | |
if requests.has_key? id | |
requests[id].resume Marshal.load(body) | |
end | |
end | |
make_method :_provided_methods | |
provided_methods.each { |meth| make_method meth } | |
end | |
def requests | |
@requests ||= Hash.new | |
end | |
def connection | |
@connection ||= AMQP.connect host: 'localhost' | |
end | |
def channel | |
@channel ||= AMQP::Channel.new connection | |
end | |
def response_queue | |
if ! @response_queue | |
f = Fiber.current | |
channel.queue( "", exclusive: true, auto_delete: true ) do |queue, declare_ok| | |
f.resume queue | |
end | |
@response_queue = Fiber.yield | |
end | |
@response_queue | |
end | |
private | |
def make_method method | |
self.class.instance_eval do | |
define_method method do |*args| | |
id = rand(10000000).to_s | |
requests[id] = Fiber.current | |
channel.default_exchange.publish( Marshal.dump({method: method, args: args}), routing_key: self.class.name, reply_to: response_queue.name, correlation_id: id ) | |
result = Fiber.yield | |
requests.delete id | |
result | |
end | |
end | |
end | |
def provided_methods | |
@provided_methods ||= _provided_methods | |
end | |
end | |
module Server | |
module ClassMethods | |
def provide *args | |
@provided_methods ||= [] | |
@provided_methods += args | |
@provided_methods.uniq! | |
end | |
def provided_methods | |
@provided_methods | |
end | |
end | |
def self.included base | |
base.extend ClassMethods | |
end | |
def connection | |
@connection ||= AMQP.connect host: 'localhost' | |
end | |
def channel | |
@channel ||= AMQP::Channel.new connection | |
end | |
def queue | |
@queue ||= channel.queue self.class.name | |
end | |
def subscribe | |
queue.subscribe do |header, body| | |
body = Marshal.load body | |
method = body[:method] | |
args = body[:args] | |
result = self.send( method, *args ) | |
channel.default_exchange.publish Marshal.dump(result), routing_key: header.reply_to, correlation_id: header.correlation_id | |
end | |
end | |
private | |
def _provided_methods | |
self.class.provided_methods | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require "./amqp_rpc" | |
class Calculator | |
include AMQP_RPC::Client | |
def initialize | |
subscribe | |
end | |
end | |
EM.run do | |
Fiber.new do | |
c = Calculator.new | |
1.upto(50) { |i| | |
puts c.remote_method(i) | |
} | |
EM.stop { exit } | |
end.resume | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require "./amqp_rpc" | |
class Calculator | |
include AMQP_RPC::Server | |
provide :remote_method | |
def initialize | |
subscribe | |
end | |
def remote_method x | |
x * 2 | |
end | |
end | |
EM.run do | |
server = Calculator.new | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment