Created
March 1, 2017 21:56
-
-
Save iamatypeofwalrus/8105e291760c6235025a63fe65ed7705 to your computer and use it in GitHub Desktop.
Ruby Aws SDK V2 ConnectionPool forking issue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run and fail | |
# AWS_ACCESS_KEY_ID=*** ruby connection_pool_issue.rb | |
# | |
# Does not crash | |
# empty_connection_pools_after_fork=true ruby connection_pool_issue.rb | |
# | |
# Hypothesis: | |
# | |
# the Aws SDK maintains a ConnectionPool in Seahorse. After fork | |
# file descriptors are shared between the parent process and any child processes. | |
# Open sockets / connections are just file descriptors in unix-y environments. | |
# There is a race condition between a parent process and a child process using | |
# an existing open connection in the pool. | |
# | |
# The AWS sdk does not provide a top level API for clearing the connection pool | |
# nor does it document this behavior anywhere. You can, however, call into | |
# Seahorse::Client::NetHttp::ConnectionPool and empty the connection pools in | |
# the child after fork. | |
require 'aws-sdk' | |
Aws.eager_autoload! | |
QUEUE_NAME = "enqueue-tests" | |
V2 = Aws::SQS::Client.new | |
# When a race condition occurs this proc will log the args passed to Aws::Structure.new | |
# This was determined from various stack traces as the point of failure | |
def register_kernel_func(logger) | |
proc = Proc.new { |event, file, line, id, b, klass| | |
next unless event == "call" && klass == Aws::Structure && id == :new | |
val = b.local_variable_get(:args) | |
if val.first.length > 0 | |
logger.info("Aws::Structure.new received args that will cause a TypeError: args: #{val}") | |
end | |
} | |
Kernel.set_trace_func(proc) | |
end | |
def create_queue(queue_name) | |
begin | |
queue_url = V2.get_queue_url(queue_name: queue_name).queue_url | |
rescue | |
queue_url = V2.create_queue(queue_name: queue_name).queue_url | |
end | |
Aws::SQS::Queue.new(url: queue_url) | |
end | |
def send_messages(queue, num) | |
num.times do |i| | |
body = URI.encode(JSON.dump({iteration: i})) | |
queue.send_message( | |
message_body: body | |
) | |
end | |
end | |
def receive_messages(queue, num_workers, logger, empty_connection_pools_after_fork: false) | |
children = [] | |
loop do | |
children = reap_children(children) | |
num_messages = num_workers - children.length | |
if num_messages < 1 | |
next | |
end | |
logger.info "Parent: calling Aws::SQS::Queue#receive_messages" | |
msgs = queue.receive_messages( | |
wait_time_seconds: 20, | |
max_number_of_messages: num_messages | |
) | |
logger.info "Parent: received #{msgs.count} messages from #{QUEUE_NAME}" | |
msgs.each do |msg| | |
pid = fork do | |
if empty_connection_pools_after_fork | |
logger.info "Attempting to empty connection pools" | |
Seahorse::Client::NetHttp::ConnectionPool.pools.each {|pl| pl.empty!} | |
end | |
# Essentially a no-op | |
JSON.parse(URI.decode(msg.body)) | |
logger.info "Child: deleting message" | |
msg.delete | |
end | |
children << pid | |
end | |
end | |
end | |
def reap_children(children) | |
return [] if children.length < 1 | |
finished = children.map {|pid| Process.waitpid(pid, Process::WNOHANG)}.compact | |
children - finished | |
end | |
def main | |
logger = Logger.new(STDOUT) | |
register_kernel_func(logger) | |
logger.info "Creating queue with name #{QUEUE_NAME}" | |
queue = create_queue(QUEUE_NAME) | |
num_messages = 200 | |
logger.info "Sending #{num_messages} to #{QUEUE_NAME}" | |
send_messages(queue, num_messages) | |
logger.info "Processing messages in #{QUEUE_NAME}" | |
receive_messages( | |
queue, | |
10, | |
logger, | |
empty_connection_pools_after_fork: !!ENV["empty_connection_pools_after_fork"] | |
) | |
ensure | |
logger.info "Error or interrupt ocurred. Cleaning up" | |
queue.purge if queue rescue Aws::SQS::Errors::PurgeQueueInProgress | |
end | |
main |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment