Created
May 24, 2015 10:03
-
-
Save lmmendes/4d3e828984cd9df5d1d5 to your computer and use it in GitHub Desktop.
Monkey patch for Mongoid 4.x "Could not connect to a primary node for replica set"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#================================================================================================== | |
# Monkey Patch (lmmendes) | |
# MongoDB or in this case Mongoid and it's driver Moped have problems running some commands | |
# when the Primary node goes down and Moped tries to write to the database before | |
# refreshing the cluster info or at least trying the same command on each node before failing. | |
# From GitHub: | |
# Moped::Errors::ConnectionFailure: Could not connect to a primary node for replica set | |
# https://github.com/mongoid/moped/issues/348 | |
#================================================================================================== | |
require 'mongoid' | |
if Mongoid::VERSION >= '5.0.0' | |
raise "Please remove this patch for Mongoid 4.x before upgrading." | |
end | |
# ================================================================================================== | |
# Patch from | |
# https://github.com/wandenberg/moped/commit/fe36cd65b105d83253e25d0e474711244ae31302 | |
# ================================================================================================== | |
require 'moped' | |
require "moped/query" | |
# ================================================================================================== | |
# From | |
# https://raw.githubusercontent.com/wandenberg/moped/fe36cd65b105d83253e25d0e474711244ae31302/lib/moped/retryable.rb | |
# ================================================================================================== | |
module Moped | |
# Provides the shared behaviour for retry failed operations. | |
# | |
# @since 2.0.0 | |
module Retryable | |
private | |
# Execute the provided block on the cluster and retry if the execution | |
# fails. | |
# | |
# @api private | |
# | |
# @example Execute with retry. | |
# preference.with_retry(cluster) do | |
# cluster.with_primary do |node| | |
# node.refresh | |
# end | |
# end | |
# | |
# @param [ Cluster ] cluster The cluster. | |
# @param [ Integer ] retries The number of times to retry. | |
# | |
# @return [ Object ] The result of the block. | |
# | |
# @since 2.0.0 | |
def with_retry(cluster, retries = cluster.max_retries, &block) | |
begin | |
block.call | |
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e | |
raise e if e.is_a?(Errors::PotentialReconfiguration) && | |
! (e.message.include?("not master") || e.message.include?("Not primary")) | |
if retries > 0 | |
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a") | |
sleep(cluster.retry_interval) | |
cluster.refresh | |
with_retry(cluster, retries - 1, &block) | |
else | |
raise e | |
end | |
end | |
end | |
end | |
end | |
# ================================================================================================== | |
# From | |
# https://raw.githubusercontent.com/wandenberg/moped/fe36cd65b105d83253e25d0e474711244ae31302/lib/moped/collection.rb | |
# ================================================================================================== | |
module Moped | |
# The class for interacting with a MongoDB collection. | |
# | |
# @since 1.0.0 | |
class Collection | |
include Readable | |
include Retryable | |
# @!attribute database | |
# @return [ Database ] The database for the collection. | |
# @!attribute name | |
# @return [ String ] The name of the collection. | |
attr_reader :database, :name | |
# Return whether or not this collection is a capped collection. | |
# | |
# @example Is the collection capped? | |
# collection.capped? | |
# | |
# @return [ true, false ] If the collection is capped. | |
# | |
# @since 1.4.0 | |
def capped? | |
database.command(collstats: name)["capped"] | |
end | |
# Drop the collection. | |
# | |
# @example Drop the collection. | |
# collection.drop | |
# | |
# @return [ Hash ] The command information. | |
# | |
# @since 1.0.0 | |
def drop | |
begin | |
session.with(read: :primary).command(drop: name) | |
rescue Moped::Errors::OperationFailure => e | |
raise e unless e.ns_not_found? | |
false | |
end | |
end | |
# Rename the collection | |
# | |
# @example Rename the collection to 'foo' | |
# collection.rename('foo') | |
# | |
# @return [ Hash ] The command information. | |
# | |
# @since 2.0.0 | |
def rename(to_name) | |
begin | |
session. | |
with(database: "admin", read: :primary). | |
command(renameCollection: "#{database.name}.#{name}", to: "#{database.name}.#{to_name}") | |
rescue Moped::Errors::OperationFailure => e | |
raise e unless e.ns_not_exists? | |
false | |
end | |
end | |
# Build a query for this collection. | |
# | |
# @example Build a query based on the provided selector. | |
# collection.find(name: "Placebo") | |
# | |
# @param [ Hash ] selector The query selector. | |
# | |
# @return [ Query ] The generated query. | |
# | |
# @since 1.0.0 | |
def find(selector = {}) | |
Query.new(self, selector) | |
end | |
alias :where :find | |
# Access information about this collection's indexes. | |
# | |
# @example Get the index information. | |
# collection.indexes | |
# | |
# @return [ Indexes ] The index information. | |
# | |
# @since 1.0.0 | |
def indexes | |
Indexes.new(database, name) | |
end | |
# Initialize the new collection. | |
# | |
# @example Initialize the collection. | |
# Collection.new(database, :artists) | |
# | |
# @param [ Database ] database The collection's database. | |
# @param [ String, Symbol] name The collection name. | |
# | |
# @since 1.0.0 | |
def initialize(database, name) | |
@database = database | |
@name = name.to_s | |
end | |
# Insert one or more documents into the collection. | |
# | |
# @example Insert a single document. | |
# db[:people].insert(name: "John") | |
# | |
# @example Insert multiple documents in batch. | |
# db[:people].insert([{name: "John"}, {name: "Joe"}]) | |
# | |
# @param [ Hash, Array<Hash> ] documents The document(s) to insert. | |
# @param [ Array ] flags The flags, valid values are :continue_on_error. | |
# | |
# @option options [Array] :continue_on_error Whether to continue on error. | |
# | |
# @return [ nil ] nil. | |
# | |
# @since 1.0.0 | |
def insert(documents, flags = nil) | |
with_retry(cluster) do | |
docs = documents.is_a?(Array) ? documents : [ documents ] | |
cluster.with_primary do |node| | |
node.insert(database.name, name, docs, write_concern, flags: flags || []) | |
end | |
end | |
end | |
# Call aggregate function over the collection. | |
# | |
# @example Execute an aggregation. | |
# session[:users].aggregate({ | |
# "$group" => { | |
# "_id" => "$city", | |
# "totalpop" => { "$sum" => "$pop" } | |
# } | |
# }) | |
# | |
# @param [ Hash, Array<Hash> ] documents representing the aggregate | |
# function to execute | |
# | |
# @return [ Hash ] containing the result of aggregation | |
# | |
# @since 1.3.0 | |
def aggregate(*pipeline) | |
session.command(aggregate: name, pipeline: pipeline.flatten)["result"] | |
end | |
# Get the session for the collection. | |
# | |
# @example Get the session for the collection. | |
# collection.session | |
# | |
# @return [ Session ] The session for the collection. | |
# | |
# @since 2.0.0 | |
def session | |
database.session | |
end | |
def write_concern | |
session.write_concern | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment