Skip to content

Instantly share code, notes, and snippets.

@eranharel
Created July 29, 2013 10:12
Show Gist options
  • Select an option

  • Save eranharel/6103369 to your computer and use it in GitHub Desktop.

Select an option

Save eranharel/6103369 to your computer and use it in GitHub Desktop.
Netty Graphite Client
package com.outbrain.gruffalo.publish;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* @author Eran Harel
*/
class GraphiteMetricsPublisher implements MetricsPublisher {
private static final Logger log = LoggerFactory.getLogger(GraphiteHandler.class);
private static final int RECONNECT_DELAY_SEC = 5;
private final String graphiteHost;
private final int graphitePort;
private final EventLoopGroup eventLoopGroup;
private final ChannelHandler graphiteHandler = new GraphiteHandler();
private final GraphiteMetricsPublisher.GraphiteClientChannelInitializer channelInitializer = new GraphiteClientChannelInitializer();
private Channel channel;
public GraphiteMetricsPublisher(final String graphiteHost, final int graphitePort, final EventLoopGroup eventLoopGroup) {
this.graphiteHost = graphiteHost;
this.graphitePort = graphitePort;
this.eventLoopGroup = eventLoopGroup;
}
private Bootstrap configureBootstrap(EventLoopGroup eventLoopGroup) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.remoteAddress(graphiteHost, graphitePort);
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(channelInitializer);
return bootstrap;
}
public void connect() {
channel = configureBootstrap(eventLoopGroup).connect().channel();
}
@Override
public void publishMetrics(String metrics) {
channel.writeAndFlush(metrics);
}
private class GraphiteClientChannelInitializer extends ChannelInitializer<Channel> {
private final StringDecoder DECODER = new StringDecoder();
private final StringEncoder ENCODER = new StringEncoder();
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", DECODER);
pipeline.addLast("encoder", ENCODER); // we don't really read responses...
pipeline.addLast("handler", graphiteHandler);
}
}
@ChannelHandler.Sharable
private class GraphiteHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.warn("Got an unexpected downstream message: " + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Connected to: {}", ctx.channel().remoteAddress());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.warn("Got disconnected... will try to reconnect in {} sec...", RECONNECT_DELAY_SEC);
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
log.info("Reconnecting to {}:{}", graphiteHost, graphitePort);
connect();
}
}, RECONNECT_DELAY_SEC, TimeUnit.SECONDS);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("Unexpected exception from downstream.", cause);
ctx.close();
}
}
}
@normanmaurer

Copy link
Copy Markdown

GraphiteHandler instance can be static and so shared

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment