Last active
May 7, 2017 06:17
-
-
Save AliYusuf95/d7bc5e2f0b1f9871b6d773d2fd884aad to your computer and use it in GitHub Desktop.
RxJava event bus implementation with subscribing and unsubscribing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.support.annotation.NonNull; | |
import java.util.HashMap; | |
import java.util.Map; | |
import io.reactivex.Observable; | |
import io.reactivex.android.schedulers.AndroidSchedulers; | |
import io.reactivex.disposables.CompositeDisposable; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.functions.Consumer; | |
import io.reactivex.functions.Predicate; | |
import io.reactivex.subjects.PublishSubject; | |
/** | |
* Used for subscribing to and publishing to subjects. Allowing you to send data between activities, fragments, etc. | |
*/ | |
public final class RxBus { | |
private static final String TAG = RxBus.class.getSimpleName(); | |
private static PublishSubject<Object> bus; | |
private static Map<Object, CompositeDisposable> sSubscriptionsMap = new HashMap<>(); | |
private RxBus() { | |
// Empty constructor | |
} | |
/** | |
* Get bus object or create it if it's not already in memory. | |
*/ | |
private static PublishSubject<Object> getBus(){ | |
if (bus == null){ | |
bus = PublishSubject.create(); | |
bus.subscribeOn(AndroidSchedulers.mainThread()); | |
} | |
return bus; | |
} | |
/** | |
* Get the CompositeDisposable or create it if it's not already in memory. | |
*/ | |
@NonNull | |
private static CompositeDisposable getCompositeSubscription(@NonNull Object object) { | |
CompositeDisposable compositeSubscription = sSubscriptionsMap.get(object); | |
if (compositeSubscription == null) { | |
compositeSubscription = new CompositeDisposable(); | |
sSubscriptionsMap.put(object, compositeSubscription); | |
} | |
return compositeSubscription; | |
} | |
/** | |
* Subscribe to the specified event and listen for updates on that event. Pass in an event class | |
* to associate your registration with, so that you can unsubscribe later. | |
* | |
* Note: Make sure to call {@link RxBus#unregister(Object)} to avoid memory leaks. | |
*/ | |
public static <T> Disposable subscribe(@NonNull Object lifecycle, @NonNull final Class<T> eventClass, @NonNull Consumer<T> action) { | |
Disposable subscription = getBus() | |
.filter(new Predicate<Object>() { | |
@Override | |
public boolean test(@io.reactivex.annotations.NonNull Object o) throws Exception { | |
return o != null; // Filter out null objects, better safe than sorry | |
} | |
}) | |
.filter(new Predicate<Object>() { | |
@Override | |
public boolean test(@io.reactivex.annotations.NonNull Object o) throws Exception { | |
return eventClass.isInstance(o); // check if same event class | |
} | |
}) | |
.cast(eventClass) // Cast it for easier usage | |
.subscribe(action); | |
getCompositeSubscription(lifecycle).add(subscription); | |
return subscription; | |
} | |
/** | |
* Get {@link Observable} to a specified event class. | |
* | |
* Note: If you make a subscription to the observable make sure to call {@link Disposable#dispose()} | |
* to avoid memory leaks. | |
*/ | |
public static <T> Observable<T> observable(@NonNull final Class<T> eventClass) { | |
return getBus() | |
.filter(new Predicate<Object>() { | |
@Override | |
public boolean test(@io.reactivex.annotations.NonNull Object o) throws Exception { | |
return o != null; // Filter out null objects, better safe than sorry | |
} | |
}) | |
.filter(new Predicate<Object>() { | |
@Override | |
public boolean test(@NonNull Object o) throws Exception { | |
return eventClass.isInstance(o); // check if same event class | |
} | |
}) | |
.cast(eventClass); // Cast it for easier usage | |
} | |
/** | |
* Unregisters this object from the bus, removing all subscriptions. | |
* This should be called when the object is going to go out of memory. | |
*/ | |
public static void unregister(@NonNull Object lifecycle) { | |
//We have to remove the composition from the map, because once you unsubscribe it can't be used anymore | |
CompositeDisposable compositeSubscription = sSubscriptionsMap.remove(lifecycle); | |
if (compositeSubscription != null) { | |
compositeSubscription.dispose(); | |
} | |
} | |
/** | |
* Post an event for all subscribers of that event. | |
*/ | |
public static <T> void post(@NonNull T event) { | |
if (getBus().hasObservers()){ | |
getBus().onNext(event); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Example usage | |
*/ | |
public class RxBusActivity extends AppCompatActivity { | |
@Override | |
protected void onCreate(@Nullable Bundle savedInstanceState) { | |
super.onCreate(savedInstanceState); | |
// Subscribe this activity in Event | |
RxBus.subscribe(this, Event.class, new Consumer<Event>() { | |
@Override | |
public void accept(Event event) throws Exception { | |
} | |
}); | |
// Post an event in the bus | |
RxBus.post(new Event()); | |
} | |
@Override | |
protected void onDestroy() { | |
super.onDestroy(); | |
// Unsubscribe this activity to avoid memory leak. | |
RxBus.unsubscribe(this); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment