-
-
Save felippemr/6d39fcf35f51fb7a2e38 to your computer and use it in GitHub Desktop.
MongoDB serverStatus output plugin which works with mongoDB 3.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class ServerStatusInput < Input | |
Plugin.register_input('serverstatus', self) | |
config_param :uris, :array, :default => nil | |
config_param :uri, :string | |
config_param :stats_interval, :time, :default => 60 # every minute | |
config_param :tag_prefix, :string, :default => "serverstatus" | |
def initialize | |
super | |
require 'mongo' | |
end | |
def configure(conf) | |
super | |
unless @uris or @uri | |
raise ConfigError, 'uris or uri must be specified' | |
end | |
if @uris.nil? | |
@uris = [@uri] | |
end | |
@conns = get_conection | |
end | |
def get_conection | |
@uris.map do |uri_str| | |
uri_str = "mongodb://#{uri_str}" if not uri_str.start_with?("mongodb://") | |
uri = get_uri(uri_str) | |
client = Mongo::Client.new(uri.servers, uri.options) | |
[client, uri] | |
end | |
end | |
def get_uri(uri_str) | |
if uri_str.include?('@') | |
user, password = uri_str.split('@')[0].split('//')[1].split(':') | |
uri = Mongo::URI.new(uri_str, :connect => :direct, :user => user, :password => password) | |
else | |
uri = Mongo::URI.new(uri_str, :connect => :direct) | |
end | |
uri | |
end | |
def start | |
@loop = Coolio::Loop.new | |
tw = TimerWatcher.new(@stats_interval, true, @log, &method(:collect_serverstatus)) | |
tw.attach(@loop) | |
@thread = Thread.new(&method(:run)) | |
end | |
def run | |
@loop.run | |
rescue | |
log.error "unexpected error", :error=>$!.to_s | |
log.error_backtrace | |
end | |
def shutdown | |
@loop.stop | |
@thread.join | |
end | |
def reconnect | |
for conn, conn_uri in @conns | |
conn.reconnect | |
end | |
end | |
def collect_serverstatus | |
begin | |
for conn, conn_uri in @conns | |
database = conn.database | |
stats = database.command(:serverStatus => :true).first | |
make_data_msgpack_compatible(stats) | |
host, port = conn_uri.servers[0].split(':') | |
tag = [@tag_prefix, host.gsub(/[\.-]/, "_"), port].join(".") | |
Engine.emit(tag, Engine.now, stats) | |
end | |
rescue => e | |
log.error "failed to collect MongoDB stats", :error_class => e.class, :error => e | |
log.info "trying to reconnect on mongodb..." | |
reconnect | |
end | |
end | |
# MessagePack doesn't like it when the field is of Time class. | |
# This is a convenient method that traverses through the | |
# getServerStatus response and update any field that is of Time class. | |
def make_data_msgpack_compatible(data) | |
if [Hash, BSON::Document].include?(data.class) | |
data.each {|k, v| | |
if v.respond_to?(:each) | |
make_data_msgpack_compatible(v) | |
elsif v.class == Time | |
data[k] = v.to_i | |
elsif v.class == BSON::ObjectId | |
data[k] = v.to_s | |
end | |
} | |
# serverStatus's "locks" field has "." as a key, which can't be | |
# inserted back to MongoDB withou wreaking havoc. Replace it with | |
# "global" | |
data["global"] = data.delete(".") if data["."] | |
elsif data.class == Array | |
data.each_with_index { |v, i| | |
if v.respond_to?(:each) | |
make_data_msgpack_compatible(v) | |
elsif v.class == Time | |
data[i] = v.to_i | |
end | |
} | |
end | |
end | |
class TimerWatcher < Coolio::TimerWatcher | |
def initialize(interval, repeat, log, &callback) | |
@callback = callback | |
@log = log | |
super(interval, repeat) | |
end | |
def on_timer | |
@callback.call | |
rescue | |
@log.error $!.to_s | |
@log.error_backtrace | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment