Skip to content

Instantly share code, notes, and snippets.

@neworld
Created March 9, 2021 09:17
Show Gist options
  • Select an option

  • Save neworld/3f7d5b2d2ecb9aaee156f41684de0ff1 to your computer and use it in GitHub Desktop.

Select an option

Save neworld/3f7d5b2d2ecb9aaee156f41684de0ff1 to your computer and use it in GitHub Desktop.

Revisions

  1. neworld created this gist Mar 9, 2021.
    47 changes: 47 additions & 0 deletions BreakDisposeChainObservable.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,47 @@
    package com.vinted.feature.base.ui.rx

    import io.reactivex.Observable
    import io.reactivex.Observer
    import io.reactivex.disposables.Disposable

    /**
    *
    * Breaks dispose chain. Upstream will be not be disposed.
    * This class should be used in cases where you need guarantee upstream will be executed at any price.
    * For example network call, those must be delivered
    */
    class BreakDisposeChainObservable<T: Any>(private val upstream: Observable<T>) : Observable<T>() {
    override fun subscribeActual(observer: Observer<in T>) {
    upstream.subscribe(BreakDisposeChainObserver(observer))
    }

    class BreakDisposeChainObserver<T>(observer: Observer<T>) : Observer<T>, Disposable {

    @Volatile
    private var observer: Observer<T>? = observer

    override fun onSubscribe(disposable: Disposable) {
    observer!!.onSubscribe(this)
    }

    override fun onComplete() {
    observer?.onComplete()
    }

    override fun onNext(value: T) {
    observer?.onNext(value)
    }

    override fun onError(error: Throwable) {
    observer?.onError(error)
    }

    override fun isDisposed() = observer == null

    override fun dispose() {
    observer = null
    }
    }
    }

    fun <T: Any> Observable<T>.breakDisposeChain(): Observable<T> = BreakDisposeChainObservable(this)
    119 changes: 119 additions & 0 deletions BreakDisposeChainObservableTest.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,119 @@
    package com.vinted.data.rx.api

    import com.vinted.feature.base.ui.rx.BreakDisposeChainObservable
    import io.reactivex.Observable
    import io.reactivex.Observer
    import io.reactivex.disposables.Disposable
    import io.reactivex.subjects.PublishSubject
    import org.junit.Test
    import kotlin.test.assertTrue

    class BreakDisposeChainObservableTest {
    @Test
    fun complete_notDisposed_pass() {
    val upstream = Observable.empty<Unit>()

    val fixture = BreakDisposeChainObservable(upstream)

    fixture.test().assertComplete()
    }

    @Test
    fun next_notDisposed_pass() {
    val upstream = Observable.just(5)

    val fixture = BreakDisposeChainObservable(upstream)

    fixture.test().assertValue(5)
    }

    @Test
    fun error_notDisposed_pass() {
    val exception = RuntimeException("test")
    val upstream = Observable.error<Unit>(exception)

    val fixture = BreakDisposeChainObservable(upstream)

    fixture.test().assertError(exception)
    }

    @Test
    fun dispose_upstreamShouldBeNot_disposed() {
    val upstream = PublishSubject.create<Unit>()
    val fixture = BreakDisposeChainObservable(upstream)

    fixture.subscribe().dispose()

    assertTrue(upstream.hasObservers())
    }

    @Test
    fun complete_disposed_notPass() {
    val upstream = PublishSubject.create<Unit>()

    val fixture = BreakDisposeChainObservable(upstream)
    val test = fixture.test()
    test.dispose()
    upstream.onComplete()

    test.assertNotComplete()
    }

    @Test
    fun next_disposed_notPass() {
    val upstream = PublishSubject.create<Unit>()

    val fixture = BreakDisposeChainObservable(upstream)
    val test = fixture.test()
    test.dispose()
    upstream.onNext(Unit)

    test.assertNoValues()
    }

    @Test
    fun error_disposed_notPass() {
    val upstream = PublishSubject.create<Unit>()

    val fixture = BreakDisposeChainObservable(upstream)
    val test = fixture.test()
    test.dispose()
    upstream.onError(RuntimeException())

    test.assertNoErrors()
    }

    @Test
    fun dispose_isDisposed() {
    val fixture = BreakDisposeChainObservable(Observable.never<Unit>())
    val disposableTracker = ParentDisposableDelegate()

    fixture.subscribe(disposableTracker)
    disposableTracker.dispose()

    assertTrue(disposableTracker.isDisposed)
    }

    class ParentDisposableDelegate : Observer<Unit>, Disposable {
    private lateinit var parentDisposable: Disposable

    override fun onError(p0: Throwable) {
    }

    override fun onComplete() {
    }

    override fun onNext(p0: Unit) {
    }

    override fun onSubscribe(disposable: Disposable) {
    parentDisposable = disposable
    }

    override fun isDisposed() = parentDisposable.isDisposed

    override fun dispose() {
    parentDisposable.dispose()
    }
    }
    }