Skip to content

Instantly share code, notes, and snippets.

@clemp6r
Last active March 3, 2016 14:25
Show Gist options
  • Save clemp6r/bd8da18f70c15ea15a61 to your computer and use it in GitHub Desktop.
Save clemp6r/bd8da18f70c15ea15a61 to your computer and use it in GitHub Desktop.

Revisions

  1. clemp6r revised this gist Mar 3, 2016. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions ReactiveStore.kt
    Original file line number Diff line number Diff line change
    @@ -43,6 +43,7 @@ abstract class ReactiveStore<T> {
    onNewValue(it)
    },
    {
    // TODO wrap errors into values without stopping the observable
    bus.onError(it)
    bus = PublishSubject.create<T>()
    }
  2. clemp6r created this gist Mar 3, 2016.
    62 changes: 62 additions & 0 deletions ReactiveStore.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,62 @@
    package com.wizbii.wizbiiandroid.services

    import rx.Observable
    import rx.subjects.PublishSubject

    /**
    * POC of a reactive data store.
    */
    abstract class ReactiveStore<T> {

    /**
    * Local version of the data.
    */
    private var value: T? = null

    /**
    * Bus for notifying new versions of the data to all subscribers.
    */
    private var bus = PublishSubject.create<T>()

    /**
    * Observable for fetching the remote version of the data.
    */
    protected abstract fun getRemoteData(): Observable<T>

    /**
    * Updates the remote data and emits the updated value.
    */
    protected abstract fun updateRemoteData(parameters: Any): Observable<T>

    /**
    * Infinite stream of value updates.
    */
    fun values(): Observable<T> {
    return Observable.create {
    if (value != null) {
    it.onNext(value)
    bus.subscribe(it)
    } else {
    bus.subscribe(it)
    getRemoteData().subscribe(
    {
    onNewValue(it)
    },
    {
    bus.onError(it)
    bus = PublishSubject.create<T>()
    }
    )
    }
    }
    }

    private fun onNewValue(it: T) {
    value = it
    bus.onNext(it)
    }

    fun updateValue(parameters: Any): Observable<T> {
    return updateRemoteData(parameters).doOnNext { onNewValue(it) }
    }
    }