Created
March 9, 2021 09:17
-
-
Save neworld/3f7d5b2d2ecb9aaee156f41684de0ff1 to your computer and use it in GitHub Desktop.
RxJava operator written in kotlin which breaks dispose chain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.vinted.feature.base.ui.rx | |
import io.reactivex.Observable | |
import io.reactivex.Observer | |
import io.reactivex.disposables.Disposable | |
/** | |
* | |
* Breaks dispose chain. Upstream will be not be disposed. | |
* This class should be used in cases where you need guarantee upstream will be executed at any price. | |
* For example network call, those must be delivered | |
*/ | |
class BreakDisposeChainObservable<T: Any>(private val upstream: Observable<T>) : Observable<T>() { | |
override fun subscribeActual(observer: Observer<in T>) { | |
upstream.subscribe(BreakDisposeChainObserver(observer)) | |
} | |
class BreakDisposeChainObserver<T>(observer: Observer<T>) : Observer<T>, Disposable { | |
@Volatile | |
private var observer: Observer<T>? = observer | |
override fun onSubscribe(disposable: Disposable) { | |
observer!!.onSubscribe(this) | |
} | |
override fun onComplete() { | |
observer?.onComplete() | |
} | |
override fun onNext(value: T) { | |
observer?.onNext(value) | |
} | |
override fun onError(error: Throwable) { | |
observer?.onError(error) | |
} | |
override fun isDisposed() = observer == null | |
override fun dispose() { | |
observer = null | |
} | |
} | |
} | |
fun <T: Any> Observable<T>.breakDisposeChain(): Observable<T> = BreakDisposeChainObservable(this) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.vinted.data.rx.api | |
import com.vinted.feature.base.ui.rx.BreakDisposeChainObservable | |
import io.reactivex.Observable | |
import io.reactivex.Observer | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.subjects.PublishSubject | |
import org.junit.Test | |
import kotlin.test.assertTrue | |
class BreakDisposeChainObservableTest { | |
@Test | |
fun complete_notDisposed_pass() { | |
val upstream = Observable.empty<Unit>() | |
val fixture = BreakDisposeChainObservable(upstream) | |
fixture.test().assertComplete() | |
} | |
@Test | |
fun next_notDisposed_pass() { | |
val upstream = Observable.just(5) | |
val fixture = BreakDisposeChainObservable(upstream) | |
fixture.test().assertValue(5) | |
} | |
@Test | |
fun error_notDisposed_pass() { | |
val exception = RuntimeException("test") | |
val upstream = Observable.error<Unit>(exception) | |
val fixture = BreakDisposeChainObservable(upstream) | |
fixture.test().assertError(exception) | |
} | |
@Test | |
fun dispose_upstreamShouldBeNot_disposed() { | |
val upstream = PublishSubject.create<Unit>() | |
val fixture = BreakDisposeChainObservable(upstream) | |
fixture.subscribe().dispose() | |
assertTrue(upstream.hasObservers()) | |
} | |
@Test | |
fun complete_disposed_notPass() { | |
val upstream = PublishSubject.create<Unit>() | |
val fixture = BreakDisposeChainObservable(upstream) | |
val test = fixture.test() | |
test.dispose() | |
upstream.onComplete() | |
test.assertNotComplete() | |
} | |
@Test | |
fun next_disposed_notPass() { | |
val upstream = PublishSubject.create<Unit>() | |
val fixture = BreakDisposeChainObservable(upstream) | |
val test = fixture.test() | |
test.dispose() | |
upstream.onNext(Unit) | |
test.assertNoValues() | |
} | |
@Test | |
fun error_disposed_notPass() { | |
val upstream = PublishSubject.create<Unit>() | |
val fixture = BreakDisposeChainObservable(upstream) | |
val test = fixture.test() | |
test.dispose() | |
upstream.onError(RuntimeException()) | |
test.assertNoErrors() | |
} | |
@Test | |
fun dispose_isDisposed() { | |
val fixture = BreakDisposeChainObservable(Observable.never<Unit>()) | |
val disposableTracker = ParentDisposableDelegate() | |
fixture.subscribe(disposableTracker) | |
disposableTracker.dispose() | |
assertTrue(disposableTracker.isDisposed) | |
} | |
class ParentDisposableDelegate : Observer<Unit>, Disposable { | |
private lateinit var parentDisposable: Disposable | |
override fun onError(p0: Throwable) { | |
} | |
override fun onComplete() { | |
} | |
override fun onNext(p0: Unit) { | |
} | |
override fun onSubscribe(disposable: Disposable) { | |
parentDisposable = disposable | |
} | |
override fun isDisposed() = parentDisposable.isDisposed | |
override fun dispose() { | |
parentDisposable.dispose() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment