Last active
April 3, 2025 09:42
-
-
Save dhsrocha/8efedf2513e773ea4df306b9339a880a to your computer and use it in GitHub Desktop.
Thread-safe publish-subscribe mechanism to It allows subscribers to register themselves for event notifications without preventing garbage collection.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.lang.ref.Reference; | |
import java.lang.ref.ReferenceQueue; | |
import java.lang.ref.WeakReference; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.Objects; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
import java.util.concurrent.CopyOnWriteArraySet; | |
import java.util.concurrent.ExecutorService; | |
import java.util.function.Consumer; | |
import java.util.function.Function; | |
/** | |
* A Broker class that manages subscriptions and publishing of events to subscribers. | |
* | |
* @param <E> the type of the event record. | |
* @author <a href="mailto:[email protected]">Diego Rocha</a> | |
*/ | |
final class Broker<E extends Record> implements Function<E, CompletableFuture<Void>> { | |
private final Collection<Throwable> exceptions = new CopyOnWriteArraySet<>(); | |
private final ReferenceQueue<Consumer<E>> refs = new ReferenceQueue<>(); | |
private final Collection<WeakReference<Consumer<E>>> subscribers = new CopyOnWriteArrayList<>(); | |
private final ExecutorService exec; | |
/** | |
* Constructs a Broker instance with the specified ExecutorService. | |
* | |
* @param exec the ExecutorService to use for executing subscriber tasks. | |
* @throws NullPointerException if the provided executor is null. | |
*/ | |
Broker(final ExecutorService exec) { | |
this.exec = Objects.requireNonNull(exec, "Executor cannot be null"); | |
} | |
@Override | |
public String toString() { | |
return "Broker{subscribers=" | |
+ this.subscribers.stream().map(Reference::get).toList() | |
+ ", exceptions=" | |
+ this.exceptions | |
+ '}'; | |
} | |
/** | |
* Subscribes a new consumer to receive events. | |
* | |
* @param subscriber the consumer that will receive events. | |
* @throws NullPointerException if the provided subscriber is null. | |
*/ | |
void subscribe(final Consumer<E> subscriber) { | |
final var valid = Objects.requireNonNull(subscriber, "Subscriber cannot be null"); | |
checkIfDirty(); | |
cleanup(); | |
this.subscribers.add(new WeakReference<>(valid, this.refs)); | |
} | |
/** | |
* Unsubscribes a consumer from receiving events. | |
* | |
* @param subscriber the consumer to be removed from the subscription list. | |
*/ | |
void unsubscribe(final Consumer<E> subscriber) { | |
checkIfDirty(); | |
cleanup(); | |
this.subscribers.removeIf(ref -> Objects.equals(ref.get(), subscriber)); | |
} | |
/** | |
* Publishes an event to all subscribed consumers. | |
* | |
* @param event the event to be published. | |
* @return a CompletableFuture that completes when all subscribers have processed the event. | |
* @throws NullPointerException if the provided event is null. | |
*/ | |
@Override | |
public CompletableFuture<Void> apply(final E event) { | |
Objects.requireNonNull(event, "Event cannot be null"); | |
checkIfDirty(); | |
cleanup(); | |
final var tasks = new ArrayList<CompletableFuture<Void>>(); | |
for (final var sub : this.subscribers) { | |
final var ref = sub.get(); | |
if (ref != null) { | |
tasks.add(task(event, ref)); | |
} | |
} | |
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])); | |
} | |
/** | |
* Retrieves the collection of exceptions that have occurred during event processing. | |
* | |
* @return a collection of Throwable representing exceptions that occurred. | |
*/ | |
Collection<Throwable> exceptions() { | |
return List.copyOf(this.exceptions); | |
} | |
/** Cleans up weak references to subscribers that have been garbage collected. */ | |
private void cleanup() { | |
while (this.refs.poll() != null) { | |
this.subscribers.removeIf(ref -> ref.get() == null); | |
} | |
} | |
/** | |
* Checks if the broker is in a dirty state due to exceptions. | |
* | |
* @throws IllegalStateException if there are collected exceptions. | |
*/ | |
private void checkIfDirty() { | |
if (!this.exceptions.isEmpty()) { | |
throw new IllegalStateException("Broker is in a dirty state due to previous exceptions"); | |
} | |
} | |
/** | |
* Creates a CompletableFuture that runs the subscriber's accept method asynchronously. | |
* | |
* @param event the event to be processed. | |
* @param ref the consumer that will process the event. | |
* @return a CompletableFuture representing the asynchronous task. | |
*/ | |
private CompletableFuture<Void> task(final E event, final Consumer<E> ref) { | |
return CompletableFuture.runAsync(() -> ref.accept(event), this.exec) | |
.whenComplete( | |
(v, ex) -> { | |
if (ex != null) { | |
this.exceptions.add(ex); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment