Created
December 24, 2013 23:25
-
-
Save tcrayford/8118762 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.aphyr.riemann.client.EventDSL; | |
import com.aphyr.riemann.client.RiemannClient; | |
import com.yammer.metrics.Metrics; | |
import com.yammer.metrics.core.*; | |
import com.yammer.metrics.stats.Snapshot; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.Map.Entry; | |
import java.util.SortedMap; | |
import java.util.concurrent.TimeUnit; | |
import com.yammer.metrics.reporting.AbstractPollingReporter; | |
public class RiemannReporter extends AbstractPollingReporter implements MetricProcessor<Long> { | |
private static final Logger LOG = LoggerFactory.getLogger(RiemannReporter.class); | |
public final RiemannClient riemann; | |
public final Config c; | |
public static class Config { | |
public final MetricsRegistry metricsRegistry; | |
public final MetricPredicate predicate; | |
public final boolean printVMMetrics; | |
public final String host; | |
public final int port; | |
public final long period; | |
public final TimeUnit unit; | |
public final String prefix; | |
public final String separator; | |
public final VirtualMachineMetrics vm = VirtualMachineMetrics.getInstance(); | |
public final Clock clock; | |
public final String name; | |
public final String localHost; | |
public final List<String> tags; | |
private Config(MetricsRegistry metricsRegistry, MetricPredicate predicate, boolean printVMMetrics, | |
String host, int port, long period, TimeUnit unit, String prefix, String separator, | |
Clock clock, String name, String localHost, List<String> tags) { | |
this.metricsRegistry = metricsRegistry; | |
this.predicate = predicate; | |
this.printVMMetrics = printVMMetrics; | |
this.host = host; | |
this.port = port; | |
this.period = period; | |
this.unit = unit; | |
this.prefix = prefix; | |
this.separator = separator; | |
this.clock = clock; | |
this.name = name; | |
this.localHost = localHost; | |
this.tags = tags; | |
} | |
public static ConfigBuilder newBuilder() { | |
return new ConfigBuilder(); | |
} | |
@Override | |
public String toString() { | |
return "Config{" + | |
"print_metrics:" + printVMMetrics + ", host:" + host + ", port:" + port + ", period:" + period + | |
", unit:" + unit + ", prefix:" + prefix + ", separator:" + separator + ", clock:" + clock + | |
", name:" + name + ", localhost:" + localHost + ", metrics_registry:" + metricsRegistry + | |
", predicate:" + predicate + ", tags:" + Arrays.toString(tags.toArray()) + "}"; | |
} | |
} | |
public static final class ConfigBuilder { | |
// Default values for Config | |
private MetricsRegistry metricsRegistry = Metrics.defaultRegistry(); | |
private MetricPredicate predicate = MetricPredicate.ALL; | |
private boolean printVMMetrics = true; | |
private String host = "localhost"; | |
private int port = 5555; | |
private long period = 60; | |
private TimeUnit unit = TimeUnit.SECONDS; | |
private String prefix = null; | |
private String separator = " "; | |
private final VirtualMachineMetrics vm = VirtualMachineMetrics.getInstance(); | |
private Clock clock = Clock.defaultClock(); | |
private String name = "riemann-reporter"; | |
private String localHost = null; | |
private List<String> tags = new ArrayList<String>(); | |
private ConfigBuilder() { } | |
public Config build(){ | |
return new Config(metricsRegistry, predicate, printVMMetrics, host, port, | |
period, unit, prefix, separator, clock, name, localHost, tags); | |
} | |
public ConfigBuilder metricsRegistry(MetricsRegistry r) { metricsRegistry = r; return this; } | |
public ConfigBuilder predicate(MetricPredicate p) { predicate = p; return this; } | |
public ConfigBuilder printVMMetrics(Boolean p) { printVMMetrics = p; return this; } | |
public ConfigBuilder host(String h) { host = h; return this; } | |
public ConfigBuilder port(int p) { port = p; return this; } | |
public ConfigBuilder period(long p) { period = p; return this; } | |
public ConfigBuilder unit(TimeUnit t) { unit = t; return this; } | |
public ConfigBuilder prefix(String p) { prefix = p; return this; } | |
public ConfigBuilder separator(String s) { separator = s; return this; } | |
public ConfigBuilder clock(Clock c) { clock = c; return this; } | |
public ConfigBuilder name(String n) { name = n; return this; } | |
public ConfigBuilder localHost(String l) { localHost = l; return this; } | |
public ConfigBuilder tags(Collection<String> ts) { tags.clear(); tags.addAll(ts); return this; } | |
} | |
public static void enable(final Config config) { | |
try { | |
if (config == null) | |
throw new IllegalArgumentException("Config cannot be null"); | |
final RiemannReporter reporter = new RiemannReporter(config); | |
reporter.start(config.period, config.unit); | |
} catch (Exception e) { | |
LOG.error("Error creating/starting Riemann reporter: ", e); | |
} | |
} | |
public RiemannReporter(final Config c) throws IOException { | |
this(c, RiemannClient.tcp(new InetSocketAddress(c.host, c.port))); | |
riemann.connect(); | |
} | |
public RiemannReporter(final Config c, final RiemannClient riemann) { | |
super(c.metricsRegistry, c.name); | |
this.riemann = riemann; | |
this.c = c; | |
} | |
@Override | |
public void run() { | |
try { | |
final long epoch = c.clock.time() / 1000; | |
if (c.printVMMetrics) { | |
sendVMMetrics(epoch); | |
} | |
sendRegularMetrics(epoch); | |
} catch (Exception e) { | |
LOG.warn("Error writing to Riemann", e); | |
} | |
} | |
protected void sendRegularMetrics(final Long epoch) { | |
for (Entry<String,SortedMap<MetricName,Metric>> entry : getMetricsRegistry().groupedMetrics(c.predicate).entrySet()) { | |
for (Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) { | |
final Metric metric = subEntry.getValue(); | |
if (metric != null) { | |
try { | |
metric.processWith(this, subEntry.getKey(), epoch); | |
} catch (Exception ignored) { | |
LOG.error("Error sending regular metric:", ignored); | |
} | |
} | |
} | |
} | |
} | |
private EventDSL newEvent() { | |
EventDSL event = riemann.event(); | |
if (c.localHost != null) { | |
event.host(c.localHost); | |
} | |
if (!c.tags.isEmpty()) { | |
event.tags(c.tags); | |
} | |
return event; | |
} | |
// The service name for a given metric. | |
public String service(MetricName name, String... rest) { | |
final StringBuilder sb = new StringBuilder(); | |
if (null != c.prefix) { | |
sb.append(c.prefix).append(c.separator); | |
} | |
sb.append(name.getGroup()) | |
.append(c.separator) | |
.append(name.getType()) | |
.append(c.separator); | |
if (name.hasScope()) { | |
sb.append(name.getScope()) | |
.append(c.separator); | |
} | |
sb.append(name.getName()); | |
for (String part : rest) { | |
sb.append(c.separator); | |
sb.append(part); | |
} | |
return sb.toString(); | |
} | |
public String service(String... parts) { | |
final StringBuilder sb = new StringBuilder(); | |
if (null != c.prefix) { | |
sb.append(c.prefix).append(c.separator); | |
} | |
for (String p : parts) { | |
sb.append(p).append(c.separator); | |
} | |
return sb.substring(0, sb.length() - c.separator.length()); | |
} | |
@Override | |
public void processGauge(MetricName name, Gauge<?> gauge, Long epoch) { | |
Object v = gauge.value(); | |
EventDSL e = newEvent().service(service(name)).time(epoch).tag("gauge"); | |
if (v instanceof Integer) { | |
e.metric((Integer) v).send(); | |
} else if (v instanceof Long) { | |
e.metric((Long) v).send(); | |
} else if (v instanceof Double) { | |
e.metric((Double) v).send(); | |
} else if (v instanceof Float) { | |
e.metric((Float) v).send(); | |
} else if (v instanceof Number) { | |
e.metric(((Number) v).floatValue()).send(); | |
} | |
} | |
@Override | |
public void processCounter(MetricName name, Counter counter, Long epoch) { | |
newEvent() | |
.service(service(name)) | |
.metric(counter.count()) | |
.time(epoch) | |
.tag("counter") | |
.send(); | |
} | |
@Override | |
public void processMeter(MetricName name, Metered meter, Long epoch) { | |
newEvent() | |
.service(service(name)) | |
.metric(meter.oneMinuteRate()) | |
.time(epoch) | |
.tag("meter") | |
.send(); | |
} | |
@Override | |
public void processHistogram(MetricName name, Histogram histogram, Long epoch) throws IOException { | |
final String service = service(name); | |
sendSummary(name, histogram, epoch); | |
sendSample(name, histogram, epoch); | |
} | |
@Override | |
public void processTimer(MetricName name, Timer timer, Long epoch) { | |
processMeter(name, timer, epoch); | |
sendSummary(name, timer, epoch); | |
sendSample(name, timer, epoch); | |
} | |
protected void sendVMMetrics(long epoch) { | |
newEvent().time(epoch).service(service("jvm", "memory", "heap-usage")).metric(c.vm.heapUsage()).send(); | |
newEvent().time(epoch).service(service("jvm", "memory", "non-heap-usage")).metric(c.vm.nonHeapUsage()).send(); | |
for (Entry<String, Double> pool : c.vm.memoryPoolUsage().entrySet()) { | |
newEvent().time(epoch).service(service("jvm", "memory", "pool-usage", pool.getKey())).metric(pool.getValue()).send(); | |
} | |
newEvent().time(epoch).service(service("jvm", "thread", "daemon-count")).metric(c.vm.daemonThreadCount()).send(); | |
newEvent().time(epoch).service(service("jvm", "thread", "count")).metric(c.vm.threadCount()).send(); | |
newEvent().time(epoch).service(service("jvm", "uptime")).metric(c.vm.uptime()).send(); | |
newEvent().time(epoch).service(service("jvm", "fd-usage")).metric(c.vm.fileDescriptorUsage()).send(); | |
for(Entry<Thread.State, Double> entry : c.vm.threadStatePercentages().entrySet()) { | |
newEvent().time(epoch).service(service("jvm", "thread", "state", entry.getKey().toString().toLowerCase())).metric(entry.getValue()).send(); | |
} | |
for(Entry<String, VirtualMachineMetrics.GarbageCollectorStats> entry : c.vm.garbageCollectors().entrySet()) { | |
newEvent().time(epoch).service(service("jvm", "gc", entry.getKey(), "time")).metric(entry.getValue().getTime(TimeUnit.MILLISECONDS)).send(); | |
newEvent().time(epoch).service(service("jvm", "gc", entry.getKey(), "runs")).metric(entry.getValue().getRuns()).send(); | |
} | |
} | |
public void sendSummary(MetricName name, Summarizable metric, Long epoch) { | |
newEvent().time(epoch).tag("histogram_summary").service(service(name, "min")).metric(metric.min()).send(); | |
newEvent().time(epoch).tag("histogram_summary").service(service(name, "max")).metric(metric.max()).send(); | |
newEvent().time(epoch).tag("histogram_summary").service(service(name, "mean")).metric(metric.mean()).send(); | |
newEvent().time(epoch).tag("histogram_summary").service(service(name, "stddev")).metric(metric.stdDev()).send(); | |
} | |
public void sendSample(MetricName name, Sampling metric, Long epoch) { | |
final Snapshot s = metric.getSnapshot(); | |
newEvent().time(epoch).tag("histogram_sample").service(service(name, ".5")).metric(s.getMedian()).send(); | |
newEvent().time(epoch).tag("histogram_sample").service(service(name, ".75")).metric(s.get75thPercentile()).send(); | |
newEvent().time(epoch).tag("histogram_sample").service(service(name, ".95")).metric(s.get95thPercentile()).send(); | |
newEvent().time(epoch).tag("histogram_sample").service(service(name, ".98")).metric(s.get98thPercentile()).send(); | |
newEvent().time(epoch).tag("histogram_sample").service(service(name, ".99")).metric(s.get99thPercentile()).send(); | |
newEvent().time(epoch).tag("histogram_sample").service(service(name, ".999")).metric(s.get999thPercentile()).send(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment