Skip to content

Instantly share code, notes, and snippets.

@jemshit
Last active October 1, 2017 07:35
Show Gist options
  • Select an option

  • Save jemshit/c8fd40180519b6df06ae598d123e1cfd to your computer and use it in GitHub Desktop.

Select an option

Save jemshit/c8fd40180519b6df06ae598d123e1cfd to your computer and use it in GitHub Desktop.

Revisions

  1. jemshit renamed this gist Oct 1, 2017. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. jemshit created this gist Oct 1, 2017.
    78 changes: 78 additions & 0 deletions HotReactiveData
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,78 @@
    import com.jakewharton.rx.ReplayingShare;
    import com.jakewharton.rxrelay2.PublishRelay;
    import com.jakewharton.rxrelay2.Relay;
    import io.reactivex.Observable;

    import java.util.concurrent.TimeUnit;

    public class HotReactiveData {

    // Problems to solve:
    // 1- UI components should be notified when data is updated
    // 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified.
    // 3- Error propagation to UI when ...
    // 4- Sometimes get fresh data from API


    private static class LocalDataSource {
    private String profile = "John";

    Observable<String> getProfile() {
    return Observable.just(profile);
    }

    void setProfile(String profile) {
    this.profile = profile;
    }
    }

    private static class ProfileRepository {
    private LocalDataSource localDataSource = new LocalDataSource();
    private Relay<String> relay = PublishRelay.<String>create().toSerialized(); // replaying share has .replay(1) so no need for BehaviourRelay

    Observable<String> getProfile() {
    return localDataSource.getProfile()
    .mergeWith(relay)
    .compose(ReplayingShare.instance()) // .replay(1).publish().refCount()
    .distinctUntilChanged(); // to not to get same item (state) twice
    }

    void updateProfileLocalOnly(String profile) {
    localDataSource.setProfile(profile);
    relay.accept(profile);
    }
    }

    public static void main(String[] args) throws InterruptedException {
    ProfileRepository repository = new ProfileRepository();

    // Listen to profile
    repository.getProfile()
    .subscribe(profile -> System.out.println("Observer1 got profile: " + profile));

    // Update profile from somewhere
    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit
    .doOnNext(integer -> System.out.println("Updating profile from some place"))
    .subscribe(integer -> repository.updateProfileLocalOnly("Agent " + integer));

    // Second Observer listens to the same data after a delay. Should get last item emitted + new updates
    Observable.defer(() -> repository.getProfile())
    .delaySubscription(2, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess)
    .subscribe(profile -> System.out.println("Observer2 got profile: " + profile));

    Thread.sleep(5000);

    // Prints:
    /* Observer1 got profile: John
    Updating profile from some place
    Observer1 got profile: Agent 0
    Updating profile from some place
    Observer1 got profile: Agent 1
    Observer2 got profile: Agent 1
    Updating profile from some place
    Observer1 got profile: Agent 2
    Observer2 got profile: Agent 2
    */
    }

    }