Created
March 9, 2021 09:17
-
-
Save neworld/3f7d5b2d2ecb9aaee156f41684de0ff1 to your computer and use it in GitHub Desktop.
Revisions
-
neworld created this gist
Mar 9, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,47 @@ package com.vinted.feature.base.ui.rx import io.reactivex.Observable import io.reactivex.Observer import io.reactivex.disposables.Disposable /** * * Breaks dispose chain. Upstream will be not be disposed. * This class should be used in cases where you need guarantee upstream will be executed at any price. * For example network call, those must be delivered */ class BreakDisposeChainObservable<T: Any>(private val upstream: Observable<T>) : Observable<T>() { override fun subscribeActual(observer: Observer<in T>) { upstream.subscribe(BreakDisposeChainObserver(observer)) } class BreakDisposeChainObserver<T>(observer: Observer<T>) : Observer<T>, Disposable { @Volatile private var observer: Observer<T>? = observer override fun onSubscribe(disposable: Disposable) { observer!!.onSubscribe(this) } override fun onComplete() { observer?.onComplete() } override fun onNext(value: T) { observer?.onNext(value) } override fun onError(error: Throwable) { observer?.onError(error) } override fun isDisposed() = observer == null override fun dispose() { observer = null } } } fun <T: Any> Observable<T>.breakDisposeChain(): Observable<T> = BreakDisposeChainObservable(this) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,119 @@ package com.vinted.data.rx.api import com.vinted.feature.base.ui.rx.BreakDisposeChainObservable import io.reactivex.Observable import io.reactivex.Observer import io.reactivex.disposables.Disposable import io.reactivex.subjects.PublishSubject import org.junit.Test import kotlin.test.assertTrue class BreakDisposeChainObservableTest { @Test fun complete_notDisposed_pass() { val upstream = Observable.empty<Unit>() val fixture = BreakDisposeChainObservable(upstream) fixture.test().assertComplete() } @Test fun next_notDisposed_pass() { val upstream = Observable.just(5) val fixture = BreakDisposeChainObservable(upstream) fixture.test().assertValue(5) } @Test fun error_notDisposed_pass() { val exception = RuntimeException("test") val upstream = Observable.error<Unit>(exception) val fixture = BreakDisposeChainObservable(upstream) fixture.test().assertError(exception) } @Test fun dispose_upstreamShouldBeNot_disposed() { val upstream = PublishSubject.create<Unit>() val fixture = BreakDisposeChainObservable(upstream) fixture.subscribe().dispose() assertTrue(upstream.hasObservers()) } @Test fun complete_disposed_notPass() { val upstream = PublishSubject.create<Unit>() val fixture = BreakDisposeChainObservable(upstream) val test = fixture.test() test.dispose() upstream.onComplete() test.assertNotComplete() } @Test fun next_disposed_notPass() { val upstream = PublishSubject.create<Unit>() val fixture = BreakDisposeChainObservable(upstream) val test = fixture.test() test.dispose() upstream.onNext(Unit) test.assertNoValues() } @Test fun error_disposed_notPass() { val upstream = PublishSubject.create<Unit>() val fixture = BreakDisposeChainObservable(upstream) val test = fixture.test() test.dispose() upstream.onError(RuntimeException()) test.assertNoErrors() } @Test fun dispose_isDisposed() { val fixture = BreakDisposeChainObservable(Observable.never<Unit>()) val disposableTracker = ParentDisposableDelegate() fixture.subscribe(disposableTracker) disposableTracker.dispose() assertTrue(disposableTracker.isDisposed) } class ParentDisposableDelegate : Observer<Unit>, Disposable { private lateinit var parentDisposable: Disposable override fun onError(p0: Throwable) { } override fun onComplete() { } override fun onNext(p0: Unit) { } override fun onSubscribe(disposable: Disposable) { parentDisposable = disposable } override fun isDisposed() = parentDisposable.isDisposed override fun dispose() { parentDisposable.dispose() } } }