Last active
October 1, 2017 07:35
-
-
Save jemshit/c8fd40180519b6df06ae598d123e1cfd to your computer and use it in GitHub Desktop.
Revisions
-
jemshit renamed this gist
Oct 1, 2017 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
jemshit created this gist
Oct 1, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,78 @@ import com.jakewharton.rx.ReplayingShare; import com.jakewharton.rxrelay2.PublishRelay; import com.jakewharton.rxrelay2.Relay; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; public class HotReactiveData { // Problems to solve: // 1- UI components should be notified when data is updated // 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified. // 3- Error propagation to UI when ... // 4- Sometimes get fresh data from API private static class LocalDataSource { private String profile = "John"; Observable<String> getProfile() { return Observable.just(profile); } void setProfile(String profile) { this.profile = profile; } } private static class ProfileRepository { private LocalDataSource localDataSource = new LocalDataSource(); private Relay<String> relay = PublishRelay.<String>create().toSerialized(); // replaying share has .replay(1) so no need for BehaviourRelay Observable<String> getProfile() { return localDataSource.getProfile() .mergeWith(relay) .compose(ReplayingShare.instance()) // .replay(1).publish().refCount() .distinctUntilChanged(); // to not to get same item (state) twice } void updateProfileLocalOnly(String profile) { localDataSource.setProfile(profile); relay.accept(profile); } } public static void main(String[] args) throws InterruptedException { ProfileRepository repository = new ProfileRepository(); // Listen to profile repository.getProfile() .subscribe(profile -> System.out.println("Observer1 got profile: " + profile)); // Update profile from somewhere Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit .doOnNext(integer -> System.out.println("Updating profile from some place")) .subscribe(integer -> repository.updateProfileLocalOnly("Agent " + integer)); // Second Observer listens to the same data after a delay. Should get last item emitted + new updates Observable.defer(() -> repository.getProfile()) .delaySubscription(2, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess) .subscribe(profile -> System.out.println("Observer2 got profile: " + profile)); Thread.sleep(5000); // Prints: /* Observer1 got profile: John Updating profile from some place Observer1 got profile: Agent 0 Updating profile from some place Observer1 got profile: Agent 1 Observer2 got profile: Agent 1 Updating profile from some place Observer1 got profile: Agent 2 Observer2 got profile: Agent 2 */ } }