Created
January 10, 2023 22:17
-
-
Save ennerf/d78eab936d9f4f53dca9ef9170d9baa1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.lmax.disruptor.EventHandler; | |
import com.lmax.disruptor.EventPoller; | |
import com.lmax.disruptor.dsl.Disruptor; | |
import com.lmax.disruptor.util.DaemonThreadFactory; | |
import javafx.animation.AnimationTimer; | |
/** | |
* Uses a JavaFX AnimationTimer that gets called once per frame to poll | |
* events from a Disruptor. This automatically batches events down to the | |
* frame rate and can remove other synchronization code for getting data | |
* into the FX thread. | |
* | |
* @author Florian Enner | |
* @since 24 Feb 2021 | |
*/ | |
public class DisruptorFx { | |
public static void example() { | |
var disruptor = new Disruptor<>(FeedbackEvent::new, 1024, DaemonThreadFactory.INSTANCE); | |
// consume on the FXAT | |
var poller = DisruptorFx.newFxPoller(disruptor, ((event, sequence, endOfBatch) -> { | |
// render something | |
System.out.println(event.position); | |
})); | |
// start | |
disruptor.start(); | |
poller.start(); | |
// produce data on some other thread | |
var buffer = disruptor.getRingBuffer(); | |
for (int i = 0; i < 100; i++) { | |
long sequence = buffer.next(); | |
var event = buffer.get(sequence); | |
event.position = i; | |
buffer.publish(sequence); | |
} | |
} | |
static class FeedbackEvent { | |
double position = Double.NaN; | |
} | |
public static <T> FxPoller<T> newFxPoller(Disruptor<T> disruptor, EventHandler<T> handler) { | |
return newFxPoller(disruptor, (event, sequence, endOfBatch) -> { | |
handler.onEvent(event, sequence, endOfBatch); | |
return true; // false would exit the batch early (e.g. timeout or to avoid blocking a writer) | |
}); | |
} | |
public static <T> FxPoller<T> newFxPoller(Disruptor<T> disruptor, EventPoller.Handler<T> handler) { | |
return new FxPoller<>(disruptor.getRingBuffer().newPoller(), handler); | |
} | |
static class FxPoller<T> extends AnimationTimer { | |
FxPoller(EventPoller<T> poller, EventPoller.Handler<T> handler) { | |
this.poller = poller; | |
this.handler = handler; | |
} | |
final EventPoller<T> poller; | |
final EventPoller.Handler<T> handler; | |
@Override | |
public void handle(long now) { | |
try { | |
// A single call takes all currently available sequences and | |
// processes them in a batch. Handlers may return early by | |
// returning false. | |
poller.poll(handler); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment