Skip to content

Instantly share code, notes, and snippets.

@mperham
Created October 21, 2024 15:18

Revisions

  1. mperham created this gist Oct 21, 2024.
    95 changes: 95 additions & 0 deletions sidekiq_cluster
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,95 @@
    #!/usr/bin/env ruby

    require 'sidekiq'
    require 'sidekiq/cli'

    # Default to running one process per core
    def process_count
    return ENV['SIDEKIQ_COUNT'].to_i unless ENV['SIDEKIQ_COUNT'].to_i == 0

    require 'etc'
    Etc.nprocessors
    end

    def log
    $stderr.write "Supervisor[#{$$}] "
    $stderr.puts yield
    end

    def handle_sig(sig)
    # If we're shutting down, we don't need to respawn child processes that die
    if sig == "INT" || sig == "TERM"
    @watch_children = false
    end

    @pids.each do |pid|
    Process.kill(sig, pid)
    end
    end

    def fork_child
    Process.fork do
    begin
    cli = Sidekiq::CLI.instance
    cli.parse
    cli.run
    rescue => e
    raise e if $DEBUG
    log { e.message }
    log { e.backtrace.join("\n") }
    exit 1
    end
    end
    end

    def start_children
    process_count.times.map do
    fork_child
    end
    end

    # Limit each child process to a chunk of RAM
    @memory_percentage_limit = ENV['SIDEKIQ_MB_LIMIT'].nil? ? 2048 : ENV['SIDEKIQ_MB_LIMIT'].to_i

    count = process_count
    log { "Starting Mastodon Sidekiq cluster with #{process_count} process#{count > 1 ? "es" : ""}" }

    @pids = start_children
    @watch_children = true
    sleep 1

    Thread.new do
    log { "Monitoring child PIDs: #{@pids}" }
    while @watch_children do
    @pids.each do |pid|
    memory_percent_used = `ps -o %mem= -p #{pid}`.to_f
    if memory_percent_used == 0.0 # child died
    @pids.delete(pid)
    @pids << new_pid = fork_child
    log { "Replaced lost PID #{pid} with #{new_pid}" }
    elsif memory_percent_used > @memory_percentage_limit
    log { "PID #{pid} crossed memory threshold -- replacing" }
    @pids.delete(pid)
    Process.kill("TSTP", pid)
    sleep 5
    Process.kill("TERM", pid)
    @pids << new_pid = fork_child
    elsif $DEBUG
    log { "#{pid}: #{memory_percent_used.round(2)}" }
    end
    end
    puts "" if $DEBUG
    sleep 10
    end
    log { "Child supervisor stopping" }
    end

    %w(INT TSTP TERM).each do |sig|
    Signal.trap(sig) do
    handle_sig(sig)
    end
    end

    Process.waitall

    log { "Stopping Sidekiq cluster" }