Skip to content

Instantly share code, notes, and snippets.

@lao
Created October 9, 2024 08:07
Show Gist options
  • Save lao/a68df027da73f796cba4f9f31a18f243 to your computer and use it in GitHub Desktop.
Save lao/a68df027da73f796cba4f9f31a18f243 to your computer and use it in GitHub Desktop.
# THIS IMPLEMENTATION IS PART OF A BLOG POST ABOUT:
# "Achieving Reliable Zero Downtime Re-indexing with Searchkick"
module Searchkick
# Module to check for reindexing and duplicate jobs to the new index
module ReindexCheck
def perform(*args, **options)
job_type = self.class.name.demodulize
class_name, index_name, record_ids = extract_job_details(job_type, args, options)
if reindexing?(class_name) && index_name != get_new_index_name(class_name)
new_index_name = get_new_index_name(class_name)
spawn_job_for_new_index(job_type, args, options, class_name, new_index_name, record_ids)
end
super(*args, **options)
end
private
# Extracts job details based on job type
def extract_job_details(job_type, args, options)
case job_type
when 'BulkReindexJob'
# args: [record_class.name, record_ids]
class_name = args[0]
record_ids = args[1]
index_name = options[:index_name] || class_name.constantize.search_index.name
when 'ProcessBatchJob'
# args: [class_name, batch]
class_name = args[0]
record_ids = args[1].map { |r| r['id'] }
index_name = options[:index_name] || class_name.constantize.search_index.name
when 'ProcessQueueJob'
# args: [class_name]
class_name = args[0]
record_ids = [] # Not directly available
index_name = options[:index_name] || class_name.constantize.search_index.name
when 'ReindexV2Job'
# args: [class_name, record_ids, options]
class_name = args[0]
record_ids = args[1]
index_name = options[:index_name] || options[:index_name] || class_name.constantize.search_index.name
else
raise "Unknown job type: #{job_type}"
end
[class_name, index_name, record_ids]
end
# Checks if reindexing is in progress for the given class
def reindexing?(class_name)
Redis.current.exists?("reindexing:"+class_name)
rescue Redis::BaseError => e
# Handle Redis connection error
Rails.logger.error "Redis error when checking reindexing status: #{e.message}"
false # Default to not reindexing to avoid inconsistencies
end
# Retrieves the new index name from Redis
def get_new_index_name(class_name)
Redis.current.get("reindexing:"+class_name+":new_index_name")
rescue Redis::BaseError => e
Rails.logger.error "Redis error when retrieving new index name: #{e.message}"
nil
end
# Spawns a duplicate job for the new index
def spawn_job_for_new_index(job_type, args, options, class_name, new_index_name, record_ids)
# Merge the new index name into options
new_options = options.merge(index_name: new_index_name)
case job_type
when 'BulkReindexJob'
Searchkick::BulkReindexJob.perform_later(*args, **new_options)
when 'ProcessBatchJob'
Searchkick::ProcessBatchJob.perform_later(*args, **new_options)
when 'ProcessQueueJob'
Searchkick::ProcessQueueJob.perform_later(*args, **new_options)
when 'ReindexV2Job'
Searchkick::ReindexV2Job.perform_later(*args, **new_options)
else
raise "Unknown job type: #{job_type}"
end
end
end
# Extend the relevant Searchkick job classes
JOBS_TO_OVERRIDE = [BulkReindexJob, ProcessBatchJob, ProcessQueueJob, ReindexV2Job]
JOBS_TO_OVERRIDE.each do |job_class|
job_class.class_eval do
prepend Searchkick::ReindexCheck
end
end
# Methods to manage reindexing flags and index names in Redis
class << self
# Flags the start of reindexing and stores the new index name
def flag_start_reindex(class_name, new_index_name)
Redis.current.set("reindexing:"+class_name, true)
Redis.current.set("reindexing:"+class_name+":new_index_name", new_index_name)
rescue Redis::BaseError => e
Rails.logger.error "Redis error when flagging start of reindex: #{e.message}"
raise e
end
# Flags the end of reindexing and clears the new index name
def flag_end_reindex(class_name)
Redis.current.del("reindexing:"+class_name)
Redis.current.del("reindexing:"+class_name+":new_index_name")
rescue Redis::BaseError => e
Rails.logger.error "Redis error when flagging end of reindex: #{e.message}"
end
# Retrieves the new index name for a class
def get_new_index_name(class_name)
Redis.current.get("reindexing:"+class_name+":new_index_name")
rescue Redis::BaseError => e
Rails.logger.error "Redis error when getting new index name: #{e.message}"
nil
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment