Created
June 9, 2011 04:15
-
-
Save dan-manges/1016048 to your computer and use it in GitHub Desktop.
mongrel2 timeout handler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
http = require 'http' | |
m2node = require 'm2node' | |
server = http.createServer((req, res) -> | |
console.log("#{req.method} #{req.url}") | |
sendResponse = -> | |
res.writeHead(200, {'Content-Type': 'text/plain'}) | |
res.end('Hello World\n') | |
match = req.url.match(/sleep.(\d+)/) | |
if match | |
setTimeout(sendResponse, parseInt("#{match[1]}000")) | |
else | |
sendResponse() | |
) | |
m2node.run( | |
server, | |
recv_spec: 'tcp://127.0.0.1:8003' | |
send_spec: 'tcp://127.0.0.1:8004' | |
) | |
console.log('Ready...') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
app = Handler( | |
send_spec = 'tcp://127.0.0.1:8001', | |
send_ident = '81b7114c-534c-4107-9f17-b317cfd59f62', | |
recv_spec = 'tcp://127.0.0.1:8002', | |
recv_ident = '' | |
) | |
localhost = Host(name = 'localhost', routes = { | |
'/': app | |
}) | |
main = Server( | |
name = "timeout_example", | |
port = 8000, | |
uuid = '5dc1fbe7-d9db-4602-8d19-80c7ef2b1b11', | |
access_log = "/logs/access.log", | |
error_log = "/logs/error.log", | |
chroot = ".", | |
default_host = "localhost", | |
pid_file = "/run/mongrel2.pid", | |
hosts = [localhost] | |
) | |
servers = [main] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'em-zeromq' | |
class RequestHandler | |
def initialize(timeout_handler) | |
@handler = timeout_handler | |
end | |
def on_readable(socket, messages) | |
messages.each { |m| @handler.request_received(m.copy_out_string) } | |
end | |
end | |
class ResponseHandler | |
def initialize(timeout_handler) | |
@handler = timeout_handler | |
end | |
def on_readable(socket, messages) | |
messages.each { |m| @handler.response_received(m.copy_out_string) } | |
end | |
end | |
class TimeoutHandler | |
TIMEOUT_IN_SECONDS = 5 | |
TIMEOUT_RESPONSE_BODY = [ | |
"HTTP/1.1 503 Service Unavailable", | |
"Content-Type: text/plain", | |
"Transfer-Encoding: chunked", | |
"", | |
"13", "Request timed out.\n", | |
"0", "", "" | |
].join("\r\n") | |
def self.start | |
context = EM::ZeroMQ::Context.new(1) | |
responses_out = context.connect(ZMQ::PUB, 'tcp://127.0.0.1:8002') | |
requests_out = context.bind(ZMQ::PUSH, 'tcp://127.0.0.1:8003') | |
timeout_handler = new(requests_out, responses_out) | |
request_handler = RequestHandler.new(timeout_handler) | |
response_handler = ResponseHandler.new(timeout_handler) | |
requests_in = context.connect(ZMQ::PULL, 'tcp://127.0.0.1:8001', request_handler) | |
responses_in = context.bind(ZMQ::SUB, 'tcp://127.0.0.1:8004', response_handler) | |
responses_in.setsockopt(ZMQ::SUBSCRIBE, '') | |
end | |
def initialize(requests_out, responses_out) | |
@timers = {} | |
@requests_out = requests_out | |
@responses_out = responses_out | |
end | |
def request_received(request) | |
@requests_out.send_msg(request) | |
server_uuid, connection_id = request.split(" ", 3)[0,2] | |
@timers["#{server_uuid} #{connection_id}"] = EM::Timer.new(TIMEOUT_IN_SECONDS) do | |
_handle_timeout(server_uuid, connection_id) | |
end | |
end | |
def response_received(response) | |
server_uuid, idNetstring = response.split(" ", 3)[0,2] | |
connection_id = idNetstring.split(":")[1][0..-2] | |
if timer = @timers.delete("#{server_uuid} #{connection_id}") | |
timer.cancel | |
@responses_out.send_msg(response) | |
end | |
end | |
def _handle_timeout(uuid, id) | |
response = "#{uuid} #{id.length}:#{id}, #{TIMEOUT_RESPONSE_BODY}" | |
@responses_out.send_msg(response) | |
@timers.delete("#{uuid} #{id}") | |
end | |
end | |
EM.run do | |
TimeoutHandler.start | |
puts "Ready..." | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment