Created
August 26, 2010 21:10
-
-
Save apeiros/552264 to your computer and use it in GitHub Desktop.
Thread::Pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
class Thread | |
# Example: | |
# pool = Thread::Pool.new(10) do |exception| handle(exception) end | |
# pool.execute(1,2,3) do |x,y,z| whatever(x,y,z) end | |
# pool.join | |
class Pool | |
# The default exception handler will just reraise the exception in the main | |
# thread | |
DefaultExceptionHandler = proc { |exception| | |
Thread.main.raise(exception) | |
} | |
# We must not rescue those exceptions in workers | |
PassthroughExceptions = [ | |
::NoMemoryError, | |
::SignalException, | |
::Interrupt, | |
::SystemExit, | |
] | |
# The number of workers in the pool | |
attr_reader :size | |
# The exception handling proc | |
attr_reader :exception_handler | |
# Whether the pool is running | |
# A pool starts running right at #new | |
attr_reader :running | |
alias running? running | |
def initialize(size, &exception_handler) | |
@size = size # The desired workforce-size | |
@exception_handler = exception_handler || DefaultExceptionHandler # A callback to invoke when an exception occurs in a job | |
@jobs = Queue.new # jobs to process are pushed here and processed by the threads in @workers | |
@join = Queue.new # worker threads that received a nil (sign to stop accepting new jobs) push themselves here | |
@worker_mutex = Mutex.new # whenever @worker is changed, synchronize over this mutex | |
@size_mutex = Mutex.new # whenever @size is changed, synchronize over this mutex | |
@running = true # indicates whether this pool is still accepting new jobs | |
@workers = [] # the worker threads process incomming jobs | |
spawn_worker(size) # spawn the worker threads | |
end | |
# The number of jobs waiting for execution | |
def pending_jobs_count | |
@jobs.size | |
end | |
# Resize the pool | |
def size=(new_size) | |
@size_mutex.synchronize do | |
case @size <=> new_size | |
when -1 # increase number of workers | |
spawn_worker(new_size-@size) | |
when 1 # reduce number of workers | |
(@size-new_size).times do | |
@jobs << nil | |
end | |
else # do nothing | |
end | |
@size = new_size | |
end | |
end | |
# Executes the given block as soon as a worker is free | |
def execute(*args, &block) | |
raise "Can't execute on a dead pool" unless @running | |
@jobs << [block, args] | |
end | |
# Spins the pool down (it won't accept any new jobs to execute) | |
# and waits for all pending jobs to be completed | |
def join | |
@running = false | |
@size.times do @jobs << nil end | |
@size_mutex.synchronize do | |
@size.times do @join.shift end | |
end | |
self | |
end | |
# Kills all worker threads (using Thread#kill) | |
# You shouldn't ever use this if you're not absolutely sure | |
# that no data being currently processed will not be corrupted | |
# or not be needed after a kill. | |
def kill | |
@running = false | |
@workers.each do |worker| worker.kill end | |
self | |
end | |
def inspect # :nodoc: | |
sprintf "#<%p:0x%x size=%d pending=%d%s>", | |
self.class, | |
object_id << 1, | |
@size, | |
pending_jobs_count, | |
@running ? '' : ' dead' | |
end | |
private | |
# Registers `amount` new threads in @workers | |
# This method is thread-safe. | |
def spawn_worker(amount=1) | |
@worker_mutex.synchronize do | |
@workers.concat(Array.new(amount) { Thread.new(&method(:worker)) }) | |
end | |
end | |
# The worker method is used in constructing a worker thread. | |
# It fetches jobs from the @jobs queue until a nil is passed. | |
# After that it'll terminate and push the Thread instance on | |
# @join | |
# In case of an exception it'll invoke the exception_handler | |
# and continue working if possible. If the exception handler | |
# terminates the worker thread, it ensures that a new thread | |
# is started to replace it. | |
# Before it exits, it removes itself from @workers | |
def worker | |
while job = @jobs.shift | |
begin | |
job.shift.call(*job) | |
rescue *PassthroughExceptions | |
raise # we should pass those on | |
rescue Exception => e # try to stay alive - it costs less | |
@exception_handler.call(e) | |
end | |
end | |
rescue *PassthroughExceptions | |
raise # we should pass those on | |
rescue Exception | |
spawn_worker | |
raise | |
else | |
@join << Thread.current | |
ensure | |
@worker_mutex.synchronize do | |
@workers.delete(Thread.current) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment