Created
January 25, 2017 14:25
-
-
Save jazzytomato/f919c8db3570b4d4ad9526c0f14a9f94 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Enumerable extensions | |
module Enumerable | |
# this will be able to handle a pool of workers (actors) | |
# can be useful to have a pool to limit the amount of concurrent work | |
class ParallelWorker | |
POOL_SIZE = ENV.fetch('CELLULOID_POOL_SIZE', [Celluloid.cores, 2].max).to_i | |
include Celluloid | |
def yielder(element, block) | |
block.call(element) | |
end | |
end | |
# similar to clojure.core/pmap | |
# like map, but each iteration is processed in a different thread | |
# to optimise for blocking i/o | |
def pmap(size = ParallelWorker::POOL_SIZE, &block) | |
ActiveRecord::Base.connection_pool.with_connection do | |
pool = ParallelWorker.pool(size: size) | |
futures = map { |elem| pool.future(:yielder, elem, block) } | |
futures.map(&:value) | |
end | |
ensure | |
pool.terminate if pool.alive? | |
end | |
def pselect(size = ParallelWorker::POOL_SIZE, &block) | |
ActiveRecord::Base.connection_pool.with_connection do | |
pool = ParallelWorker.pool(size: size) | |
tuples = map do |elem| | |
[pool.future(:yielder, elem, block), elem].freeze | |
end | |
tuples.map { |(future, elem)| elem if future.value }.compact | |
end | |
ensure | |
pool.terminate if pool.alive? | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment