Skip to content

Instantly share code, notes, and snippets.

@oli-h
Last active September 6, 2018 11:33
Show Gist options
  • Save oli-h/e44c9e42033bd10945dc6c6afa460fcb to your computer and use it in GitHub Desktop.
Save oli-h/e44c9e42033bd10945dc6c6afa460fcb to your computer and use it in GitHub Desktop.
Reproducer for Vert.x EventBus in Clustered-Mode: localConsumer
package ch.oli;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.ext.cluster.infinispan.InfinispanClusterManager;
public class ReproduceSlowVertxLocalConsumer {
public static void main(String[] args) {
// Start Vertx in Infinispan-Clustered-Mode
// Note: we don't need two or more Nodes - problem occurs also in 'single-node' mode (but need ClusteredEventBus)
ClusterManager clusterManager = new InfinispanClusterManager();
VertxOptions vertxOptions = new VertxOptions().setClusterManager(clusterManager);
Vertx.clusteredVertx(vertxOptions, res -> {
go(res.result());
});
}
private static void go(Vertx vertx) {
EventBus eb = vertx.eventBus();
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
// Toggle between eb.consumer(...) and eb.localConsumer(...)
// Watch the measured throughput (I reach 100'000/s without problems but only ~ 1'500/s with localConsumer)
// Also see high CPU- and Memory-Load when using localConsumer
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
// eb.localConsumer("gugus", msg -> {
eb.consumer("gugus", msg -> {
measure();
});
// 1'000 messages, 100 times per second --> 100'000 per second
vertx.setPeriodic(10, id -> {
for (int i = 0; i < 1_000; i++) {
eb.send("gugus", "Burn, EventBus, Burn !");
}
});
}
static long startNanos = 0L;
static long count = 0L;
private static void measure() {
count++;
long nanos = System.nanoTime() - startNanos;
if (nanos > 1_000_000_000L) { // print once per second
System.out.println(count * 1_000_000_000L / nanos);
startNanos = System.nanoTime();
count = 0;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment