Skip to content

Instantly share code, notes, and snippets.

@dhsrocha
Last active April 3, 2025 09:42
Show Gist options
  • Save dhsrocha/8efedf2513e773ea4df306b9339a880a to your computer and use it in GitHub Desktop.
Save dhsrocha/8efedf2513e773ea4df306b9339a880a to your computer and use it in GitHub Desktop.
Thread-safe publish-subscribe mechanism to It allows subscribers to register themselves for event notifications without preventing garbage collection.
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* A Broker class that manages subscriptions and publishing of events to subscribers.
*
* @param <E> the type of the event record.
* @author <a href="mailto:[email protected]">Diego Rocha</a>
*/
final class Broker<E extends Record> implements Function<E, CompletableFuture<Void>> {
private final Collection<Throwable> exceptions = new CopyOnWriteArraySet<>();
private final ReferenceQueue<Consumer<E>> refs = new ReferenceQueue<>();
private final Collection<WeakReference<Consumer<E>>> subscribers = new CopyOnWriteArrayList<>();
private final ExecutorService exec;
/**
* Constructs a Broker instance with the specified ExecutorService.
*
* @param exec the ExecutorService to use for executing subscriber tasks.
* @throws NullPointerException if the provided executor is null.
*/
Broker(final ExecutorService exec) {
this.exec = Objects.requireNonNull(exec, "Executor cannot be null");
}
@Override
public String toString() {
return "Broker{subscribers="
+ this.subscribers.stream().map(Reference::get).toList()
+ ", exceptions="
+ this.exceptions
+ '}';
}
/**
* Subscribes a new consumer to receive events.
*
* @param subscriber the consumer that will receive events.
* @throws NullPointerException if the provided subscriber is null.
*/
void subscribe(final Consumer<E> subscriber) {
final var valid = Objects.requireNonNull(subscriber, "Subscriber cannot be null");
checkIfDirty();
cleanup();
this.subscribers.add(new WeakReference<>(valid, this.refs));
}
/**
* Unsubscribes a consumer from receiving events.
*
* @param subscriber the consumer to be removed from the subscription list.
*/
void unsubscribe(final Consumer<E> subscriber) {
checkIfDirty();
cleanup();
this.subscribers.removeIf(ref -> Objects.equals(ref.get(), subscriber));
}
/**
* Publishes an event to all subscribed consumers.
*
* @param event the event to be published.
* @return a CompletableFuture that completes when all subscribers have processed the event.
* @throws NullPointerException if the provided event is null.
*/
@Override
public CompletableFuture<Void> apply(final E event) {
Objects.requireNonNull(event, "Event cannot be null");
checkIfDirty();
cleanup();
final var tasks = new ArrayList<CompletableFuture<Void>>();
for (final var sub : this.subscribers) {
final var ref = sub.get();
if (ref != null) {
tasks.add(task(event, ref));
}
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));
}
/**
* Retrieves the collection of exceptions that have occurred during event processing.
*
* @return a collection of Throwable representing exceptions that occurred.
*/
Collection<Throwable> exceptions() {
return List.copyOf(this.exceptions);
}
/** Cleans up weak references to subscribers that have been garbage collected. */
private void cleanup() {
while (this.refs.poll() != null) {
this.subscribers.removeIf(ref -> ref.get() == null);
}
}
/**
* Checks if the broker is in a dirty state due to exceptions.
*
* @throws IllegalStateException if there are collected exceptions.
*/
private void checkIfDirty() {
if (!this.exceptions.isEmpty()) {
throw new IllegalStateException("Broker is in a dirty state due to previous exceptions");
}
}
/**
* Creates a CompletableFuture that runs the subscriber's accept method asynchronously.
*
* @param event the event to be processed.
* @param ref the consumer that will process the event.
* @return a CompletableFuture representing the asynchronous task.
*/
private CompletableFuture<Void> task(final E event, final Consumer<E> ref) {
return CompletableFuture.runAsync(() -> ref.accept(event), this.exec)
.whenComplete(
(v, ex) -> {
if (ex != null) {
this.exceptions.add(ex);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment