Created
November 21, 2017 14:38
-
-
Save apeiros/0478ccf738f3df9018c35b33ddce4118 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "time" | |
require "pry" | |
require "thread" | |
class Logger | |
def initialize | |
@mutex = Mutex.new | |
end | |
def log_error(error) | |
now = Time.now | |
pid = $$ | |
tid = Thread.current.object_id.to_s(16) | |
puts "[#{now.iso8601}|PID:#{pid}|TID:#{tid}] Job interrupted through exception #{exception.message}" | |
end | |
def log_job(name, last_execution_ended_at) | |
start_time = Time.now | |
delay = last_execution_ended_at ? (start_time - last_execution_ended_at).round : 0 | |
pid = $$ | |
tid = Thread.current.object_id.to_s(16) | |
@mutex.synchronize do | |
puts "[#{start_time.iso8601}|PID:#{pid}|TID:#{tid}] Starting job #{name.inspect}, delay: #{delay}s" | |
end | |
yield | |
end_time = Time.now | |
duration = (end_time - start_time).round | |
@mutex.synchronize do | |
puts "[#{end_time.iso8601}|PID:#{pid}|TID:#{tid}] Ending job #{name.inspect}, duration: #{delay}s" | |
end | |
end | |
end | |
class Worker | |
def initialize(queue, logger) | |
@queue = queue | |
@logger = logger | |
@active = true | |
process_loop | |
end | |
def process_loop | |
while @active && job = @queue.shift | |
process(job) | |
job.last_execution_ended_at = Time.now | |
@queue.push job # re-queue job after done | |
end | |
end | |
def stop | |
@active = false | |
end | |
def process(job) | |
@logger.log_job(job.name, job.last_execution_ended_at) do | |
job.call | |
end | |
rescue NoMemoryError, SignalException, Interrupt, SystemExit | |
raise | |
rescue Exception => e | |
@logger.log_error(e) | |
end | |
end | |
class Job | |
def initialize(name, &block) | |
raise ArgumentError, "Block required" unless block | |
@name = name | |
@block = block | |
@last_execution_ended_at = nil | |
end | |
attr_accessor :last_execution_ended_at | |
attr_reader :name, :block | |
def call(*args, &block) | |
@block.call(*args, &block) | |
end | |
end | |
job_queue = Queue.new | |
pool_size = 2 | |
logger = Logger.new | |
# fill queue with all jobs | |
job_queue.push(Job.new("demo") { puts "hi"; sleep 10 }) | |
# /fill | |
Thread.abort_on_exception = true | |
workers = Array.new(pool_size) { Thread.new { Worker.new(job_queue, logger) } } | |
binding.pry | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment