Created
June 25, 2014 15:43
-
-
Save chrisckchang/943a69b02f3435281557 to your computer and use it in GitHub Desktop.
MongoDB serverStatus output plugin
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Fluent | |
class ServerStatusInput < Input | |
Plugin.register_input('serverstatus', self) | |
config_param :uris, :array, :default => nil | |
config_param :uri, :string, :default => "mongodb://localhost:27017" | |
config_param :stats_interval, :time, :default => 60 # every minute | |
config_param :tag_prefix, :string, :default => "serverstatus" | |
def initialize | |
super | |
require 'mongo' | |
end | |
def configure(conf) | |
super | |
unless @uris or @uri | |
raise ConfigError, 'uris or uri must be specified' | |
end | |
if @uris.nil? | |
@uris = [@uri] | |
end | |
@conns = @uris.map do |uri_str| | |
uri_str = "mongodb://#{uri_str}" if not uri_str.start_with?("mongodb://") | |
uri = Mongo::URIParser.new(uri_str) | |
[Mongo::MongoClient.from_uri(uri_str), uri] | |
end | |
end | |
def start | |
@loop = Coolio::Loop.new | |
tw = TimerWatcher.new(@stats_interval, true, @log, &method(:collect_serverstatus)) | |
tw.attach(@loop) | |
@thread = Thread.new(&method(:run)) | |
end | |
def run | |
@loop.run | |
rescue | |
log.error "unexpected error", :error=>$!.to_s | |
log.error_backtrace | |
end | |
def shutdown | |
@loop.stop | |
@thread.join | |
end | |
def collect_serverstatus | |
begin | |
for conn, conn_uri in @conns | |
stats = conn.db('admin').command(:serverStatus => true) | |
make_data_msgpack_compatible(stats) | |
tag = [@tag_prefix, conn_uri.host.gsub(/[\.-]/, "_"), conn_uri.port].join(".") | |
Engine.emit(tag, Engine.now, stats) | |
end | |
rescue => e | |
log.error "failed to collect MongoDB stats", :error_class => e.class, :error => e | |
end | |
end | |
# MessagePack doesn't like it when the field is of Time class. | |
# This is a convenient method that traverses through the | |
# getServerStatus response and update any field that is of Time class. | |
def make_data_msgpack_compatible(data) | |
if [Hash, BSON::OrderedHash].include?(data.class) | |
data.each {|k, v| | |
if v.respond_to?(:each) | |
make_data_msgpack_compatible(v) | |
elsif v.class == Time | |
data[k] = v.to_i | |
end | |
} | |
# serverStatus's "locks" field has "." as a key, which can't be | |
# inserted back to MongoDB withou wreaking havoc. Replace it with | |
# "global" | |
data["global"] = data.delete(".") if data["."] | |
elsif data.class == Array | |
data.each_with_index { |v, i| | |
if v.respond_to?(:each) | |
make_data_msgpack_compatible(v) | |
elsif v.class == Time | |
data[i] = v.to_i | |
end | |
} | |
end | |
end | |
class TimerWatcher < Coolio::TimerWatcher | |
def initialize(interval, repeat, log, &callback) | |
@callback = callback | |
@log = log | |
super(interval, repeat) | |
end | |
def on_timer | |
@callback.call | |
rescue | |
@log.error $!.to_s | |
@log.error_backtrace | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment