-
-
Save vinaysshenoy/8171c843da44f66971da6a1e41380b53 to your computer and use it in GitHub Desktop.
EventBus using RxJava. Bunch of subjects and observers.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.Subscriber; | |
import rx.Subscription; | |
import rx.functions.Action0; | |
import rx.subjects.PublishSubject; | |
import rx.subjects.SerializedSubject; | |
/** | |
* An object reference of EventBus | |
* courtesy https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf | |
* | |
* @hide | |
*/ | |
public class EventBus { | |
SerializedSubject<Object, Object> serializedSubject = new SerializedSubject<>(PublishSubject.create()); | |
static EventBus singleton = null; | |
EventBus() { | |
} | |
public static EventBus get() { | |
if (singleton == null) { | |
synchronized (EventBus.class) { | |
if (singleton == null) { | |
singleton = new EventBus(); | |
} | |
} | |
} | |
return singleton; | |
} | |
/** | |
* Send events. | |
*/ | |
public <T> void send(T t) { | |
serializedSubject.onNext(t); | |
} | |
/** | |
* Observe events sent. | |
*/ | |
public <T> Observable<T> observable(final Class<T> cls) { | |
final Subscription[] holder = new Subscription[1]; | |
Observable<T> obs = Observable.create(new Observable.OnSubscribe<T>() { | |
@Override | |
public void call(final Subscriber<? super T> subscriber) { | |
Subscription subscription = serializedSubject.subscribe(new Subscriber<Object>() { | |
@Override | |
public void onCompleted() { | |
subscriber.onCompleted(); | |
} | |
@Override | |
public void onError(Throwable e) { | |
subscriber.onError(e); | |
} | |
@SuppressWarnings("unchecked") | |
@Override | |
public void onNext(Object o) { | |
if (cls.equals(o.getClass())) { | |
subscriber.onNext((T) o); | |
} | |
} | |
}); | |
holder[0] = subscription; | |
} | |
}).doOnUnsubscribe(new Action0() { | |
@Override | |
public void call() { | |
if (holder[0] != null && !holder[0].isUnsubscribed()) | |
holder[0].unsubscribe(); | |
} | |
}); | |
return obs; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Subscription; | |
import rx.functions.Action1; | |
/** | |
* Sample EventBus | |
*/ | |
public class Sample { | |
static class Event { | |
int data; | |
public Event(int data) { | |
this.data = data; | |
} | |
} | |
static class AnotherEvent { | |
int data; | |
public AnotherEvent(int data) { | |
this.data = data; | |
} | |
} | |
static class ExtendedEvent extends Event { | |
public ExtendedEvent(int data) { | |
super(data); | |
} | |
} | |
public static void main(String[] args) { | |
//subscribe to receive events | |
Subscription subscription1 = EventBus | |
.get() | |
.observable(Event.class) | |
.subscribe(new Action1<Event>() { | |
@Override | |
public void call(Event myEvent) { | |
System.out.println("Event : " + myEvent.data); | |
} | |
}); | |
Subscription subscription2 = EventBus.get() | |
.observable(AnotherEvent.class) | |
.subscribe(new Action1<AnotherEvent>() { | |
@Override | |
public void call(AnotherEvent anotherEvent) { | |
System.out.println("AnotherEvent : " + anotherEvent.data); | |
} | |
}); | |
Subscription subscription3 = EventBus.get() | |
.observable(ExtendedEvent.class) | |
.subscribe(new Action1<ExtendedEvent>() { | |
@Override | |
public void call(ExtendedEvent extendedEvent) { | |
System.out.println("ExtendedEvent : " + extendedEvent.data); | |
} | |
}); | |
//send events | |
for (int i = 1; i < 10; i++) { | |
EventBus.get().send(new Event(i)); | |
EventBus.get().send(new AnotherEvent(i)); | |
EventBus.get().send(new ExtendedEvent(i)); | |
} | |
unsubscribe(subscription1); | |
unsubscribe(subscription2); | |
unsubscribe(subscription3); | |
} | |
static void unsubscribe(Subscription subscription) { | |
if (subscription != null && !subscription.isUnsubscribed()) { | |
subscription.unsubscribe(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment