Last active
March 3, 2016 14:25
-
-
Save clemp6r/bd8da18f70c15ea15a61 to your computer and use it in GitHub Desktop.
Revisions
-
clemp6r revised this gist
Mar 3, 2016 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -43,6 +43,7 @@ abstract class ReactiveStore<T> { onNewValue(it) }, { // TODO wrap errors into values without stopping the observable bus.onError(it) bus = PublishSubject.create<T>() } -
clemp6r created this gist
Mar 3, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,62 @@ package com.wizbii.wizbiiandroid.services import rx.Observable import rx.subjects.PublishSubject /** * POC of a reactive data store. */ abstract class ReactiveStore<T> { /** * Local version of the data. */ private var value: T? = null /** * Bus for notifying new versions of the data to all subscribers. */ private var bus = PublishSubject.create<T>() /** * Observable for fetching the remote version of the data. */ protected abstract fun getRemoteData(): Observable<T> /** * Updates the remote data and emits the updated value. */ protected abstract fun updateRemoteData(parameters: Any): Observable<T> /** * Infinite stream of value updates. */ fun values(): Observable<T> { return Observable.create { if (value != null) { it.onNext(value) bus.subscribe(it) } else { bus.subscribe(it) getRemoteData().subscribe( { onNewValue(it) }, { bus.onError(it) bus = PublishSubject.create<T>() } ) } } } private fun onNewValue(it: T) { value = it bus.onNext(it) } fun updateValue(parameters: Any): Observable<T> { return updateRemoteData(parameters).doOnNext { onNewValue(it) } } }