Created
October 9, 2024 08:07
-
-
Save lao/a68df027da73f796cba4f9f31a18f243 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# THIS IMPLEMENTATION IS PART OF A BLOG POST ABOUT: | |
# "Achieving Reliable Zero Downtime Re-indexing with Searchkick" | |
module Searchkick | |
# Module to check for reindexing and duplicate jobs to the new index | |
module ReindexCheck | |
def perform(*args, **options) | |
job_type = self.class.name.demodulize | |
class_name, index_name, record_ids = extract_job_details(job_type, args, options) | |
if reindexing?(class_name) && index_name != get_new_index_name(class_name) | |
new_index_name = get_new_index_name(class_name) | |
spawn_job_for_new_index(job_type, args, options, class_name, new_index_name, record_ids) | |
end | |
super(*args, **options) | |
end | |
private | |
# Extracts job details based on job type | |
def extract_job_details(job_type, args, options) | |
case job_type | |
when 'BulkReindexJob' | |
# args: [record_class.name, record_ids] | |
class_name = args[0] | |
record_ids = args[1] | |
index_name = options[:index_name] || class_name.constantize.search_index.name | |
when 'ProcessBatchJob' | |
# args: [class_name, batch] | |
class_name = args[0] | |
record_ids = args[1].map { |r| r['id'] } | |
index_name = options[:index_name] || class_name.constantize.search_index.name | |
when 'ProcessQueueJob' | |
# args: [class_name] | |
class_name = args[0] | |
record_ids = [] # Not directly available | |
index_name = options[:index_name] || class_name.constantize.search_index.name | |
when 'ReindexV2Job' | |
# args: [class_name, record_ids, options] | |
class_name = args[0] | |
record_ids = args[1] | |
index_name = options[:index_name] || options[:index_name] || class_name.constantize.search_index.name | |
else | |
raise "Unknown job type: #{job_type}" | |
end | |
[class_name, index_name, record_ids] | |
end | |
# Checks if reindexing is in progress for the given class | |
def reindexing?(class_name) | |
Redis.current.exists?("reindexing:"+class_name) | |
rescue Redis::BaseError => e | |
# Handle Redis connection error | |
Rails.logger.error "Redis error when checking reindexing status: #{e.message}" | |
false # Default to not reindexing to avoid inconsistencies | |
end | |
# Retrieves the new index name from Redis | |
def get_new_index_name(class_name) | |
Redis.current.get("reindexing:"+class_name+":new_index_name") | |
rescue Redis::BaseError => e | |
Rails.logger.error "Redis error when retrieving new index name: #{e.message}" | |
nil | |
end | |
# Spawns a duplicate job for the new index | |
def spawn_job_for_new_index(job_type, args, options, class_name, new_index_name, record_ids) | |
# Merge the new index name into options | |
new_options = options.merge(index_name: new_index_name) | |
case job_type | |
when 'BulkReindexJob' | |
Searchkick::BulkReindexJob.perform_later(*args, **new_options) | |
when 'ProcessBatchJob' | |
Searchkick::ProcessBatchJob.perform_later(*args, **new_options) | |
when 'ProcessQueueJob' | |
Searchkick::ProcessQueueJob.perform_later(*args, **new_options) | |
when 'ReindexV2Job' | |
Searchkick::ReindexV2Job.perform_later(*args, **new_options) | |
else | |
raise "Unknown job type: #{job_type}" | |
end | |
end | |
end | |
# Extend the relevant Searchkick job classes | |
JOBS_TO_OVERRIDE = [BulkReindexJob, ProcessBatchJob, ProcessQueueJob, ReindexV2Job] | |
JOBS_TO_OVERRIDE.each do |job_class| | |
job_class.class_eval do | |
prepend Searchkick::ReindexCheck | |
end | |
end | |
# Methods to manage reindexing flags and index names in Redis | |
class << self | |
# Flags the start of reindexing and stores the new index name | |
def flag_start_reindex(class_name, new_index_name) | |
Redis.current.set("reindexing:"+class_name, true) | |
Redis.current.set("reindexing:"+class_name+":new_index_name", new_index_name) | |
rescue Redis::BaseError => e | |
Rails.logger.error "Redis error when flagging start of reindex: #{e.message}" | |
raise e | |
end | |
# Flags the end of reindexing and clears the new index name | |
def flag_end_reindex(class_name) | |
Redis.current.del("reindexing:"+class_name) | |
Redis.current.del("reindexing:"+class_name+":new_index_name") | |
rescue Redis::BaseError => e | |
Rails.logger.error "Redis error when flagging end of reindex: #{e.message}" | |
end | |
# Retrieves the new index name for a class | |
def get_new_index_name(class_name) | |
Redis.current.get("reindexing:"+class_name+":new_index_name") | |
rescue Redis::BaseError => e | |
Rails.logger.error "Redis error when getting new index name: #{e.message}" | |
nil | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment