Skip to content

Instantly share code, notes, and snippets.

@jemshit
Last active October 1, 2017 08:09
Show Gist options
  • Select an option

  • Save jemshit/f29fadfa5198bc05df28ce8b472ddb76 to your computer and use it in GitHub Desktop.

Select an option

Save jemshit/f29fadfa5198bc05df28ce8b472ddb76 to your computer and use it in GitHub Desktop.

Revisions

  1. jemshit revised this gist Oct 1, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion ColdReactiveDataExpiration.java
    Original file line number Diff line number Diff line change
    @@ -39,7 +39,7 @@ private static class ProfileRepository {
    private Relay<String> relay = BehaviorRelay.<String>create().toSerialized();

    Observable<String> getProfile() {
    return Observable.concat(localDataSource.getProfile(), remoteDataSource.getProfile())
    return Observable.concat(Observable.defer(() -> localDataSource.getProfile()), Observable.defer(() -> remoteDataSource.getProfile()))
    .filter(profile -> !profile.equalsIgnoreCase("expired")) // simulating if local data source is expired
    .take(1) // get only from first Observable source which passes filter
    .compose(upstream -> upstream.doOnNext(profile -> relay.accept(profile)))
  2. jemshit created this gist Oct 1, 2017.
    97 changes: 97 additions & 0 deletions ColdReactiveDataExpiration.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,97 @@
    import com.jakewharton.rxrelay2.BehaviorRelay;
    import com.jakewharton.rxrelay2.Relay;
    import io.reactivex.Observable;

    import java.util.concurrent.TimeUnit;

    public class ColdReactiveDataExpiration {

    // Problems to solve:
    // 1- UI components should be notified when data is updated
    // 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified.
    // 3- Error propagation to UI when ...
    // 4- Sometimes get fresh data from API


    private static class LocalDataSource {
    private String profile = "John";

    Observable<String> getProfile() {
    return Observable.just(profile);
    }

    void setProfile(String profile) {
    this.profile = profile;
    }
    }

    private static class RemoteDataSource {
    private String profile = "David";

    Observable<String> getProfile() {
    return Observable.just(profile);
    }
    }

    private static class ProfileRepository {
    private LocalDataSource localDataSource = new LocalDataSource();
    private RemoteDataSource remoteDataSource = new RemoteDataSource();
    private Relay<String> relay = BehaviorRelay.<String>create().toSerialized();

    Observable<String> getProfile() {
    return Observable.concat(localDataSource.getProfile(), remoteDataSource.getProfile())
    .filter(profile -> !profile.equalsIgnoreCase("expired")) // simulating if local data source is expired
    .take(1) // get only from first Observable source which passes filter
    .compose(upstream -> upstream.doOnNext(profile -> relay.accept(profile)))
    .flatMap(profile -> relay.hide().distinctUntilChanged()); // to not to get same item (state) twice
    }

    void updateProfileLocalOnly(String profile) {
    localDataSource.setProfile(profile);
    relay.accept(profile);
    }
    }

    public static void main(String[] args) throws InterruptedException {
    ProfileRepository repository = new ProfileRepository();

    // Listen to profile
    repository.getProfile()
    .subscribe(profile -> System.out.println("Observer1 got profile: " + profile));

    // Update profile from somewhere
    Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit
    .doOnNext(integer -> System.out.println("Updating profile from some place"))
    .subscribe(integer -> {
    if (integer == 3)
    repository.updateProfileLocalOnly("expired"); // simulate expiration of local data source
    else
    repository.updateProfileLocalOnly("Agent " + integer); // update data
    });

    // Second Observer listens to the same data after a delay. Should get last item emitted + new updates
    Observable.defer(() -> repository.getProfile())
    .delaySubscription(4, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess)
    .subscribe(profile -> System.out.println("Observer2 got profile: " + profile));

    Thread.sleep(7000);

    // Prints:
    /* Observer1 got profile: John
    Updating profile from some place
    Observer1 got profile: Agent 0
    Updating profile from some place
    Observer1 got profile: Agent 1
    Updating profile from some place
    Observer1 got profile: Agent 2
    Updating profile from some place
    Observer1 got profile: expired
    Observer1 got profile: David
    Observer2 got profile: David
    Updating profile from some place
    Observer1 got profile: Agent 4
    Observer2 got profile: Agent 4
    */
    }

    }