Last active
October 1, 2017 08:09
-
-
Save jemshit/f29fadfa5198bc05df28ce8b472ddb76 to your computer and use it in GitHub Desktop.
Revisions
-
jemshit revised this gist
Oct 1, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -39,7 +39,7 @@ private static class ProfileRepository { private Relay<String> relay = BehaviorRelay.<String>create().toSerialized(); Observable<String> getProfile() { return Observable.concat(Observable.defer(() -> localDataSource.getProfile()), Observable.defer(() -> remoteDataSource.getProfile())) .filter(profile -> !profile.equalsIgnoreCase("expired")) // simulating if local data source is expired .take(1) // get only from first Observable source which passes filter .compose(upstream -> upstream.doOnNext(profile -> relay.accept(profile))) -
jemshit created this gist
Oct 1, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,97 @@ import com.jakewharton.rxrelay2.BehaviorRelay; import com.jakewharton.rxrelay2.Relay; import io.reactivex.Observable; import java.util.concurrent.TimeUnit; public class ColdReactiveDataExpiration { // Problems to solve: // 1- UI components should be notified when data is updated // 2- Offline support, presentation layer always listens to local data source. So whenever DB is updated, UI should be notified. // 3- Error propagation to UI when ... // 4- Sometimes get fresh data from API private static class LocalDataSource { private String profile = "John"; Observable<String> getProfile() { return Observable.just(profile); } void setProfile(String profile) { this.profile = profile; } } private static class RemoteDataSource { private String profile = "David"; Observable<String> getProfile() { return Observable.just(profile); } } private static class ProfileRepository { private LocalDataSource localDataSource = new LocalDataSource(); private RemoteDataSource remoteDataSource = new RemoteDataSource(); private Relay<String> relay = BehaviorRelay.<String>create().toSerialized(); Observable<String> getProfile() { return Observable.concat(localDataSource.getProfile(), remoteDataSource.getProfile()) .filter(profile -> !profile.equalsIgnoreCase("expired")) // simulating if local data source is expired .take(1) // get only from first Observable source which passes filter .compose(upstream -> upstream.doOnNext(profile -> relay.accept(profile))) .flatMap(profile -> relay.hide().distinctUntilChanged()); // to not to get same item (state) twice } void updateProfileLocalOnly(String profile) { localDataSource.setProfile(profile); relay.accept(profile); } } public static void main(String[] args) throws InterruptedException { ProfileRepository repository = new ProfileRepository(); // Listen to profile repository.getProfile() .subscribe(profile -> System.out.println("Observer1 got profile: " + profile)); // Update profile from somewhere Observable.intervalRange(0, 5, 1, 1, TimeUnit.SECONDS) // start, count, initialDelay, period, unit .doOnNext(integer -> System.out.println("Updating profile from some place")) .subscribe(integer -> { if (integer == 3) repository.updateProfileLocalOnly("expired"); // simulate expiration of local data source else repository.updateProfileLocalOnly("Agent " + integer); // update data }); // Second Observer listens to the same data after a delay. Should get last item emitted + new updates Observable.defer(() -> repository.getProfile()) .delaySubscription(4, TimeUnit.SECONDS) // delays subscription (for only cold observable i guess) .subscribe(profile -> System.out.println("Observer2 got profile: " + profile)); Thread.sleep(7000); // Prints: /* Observer1 got profile: John Updating profile from some place Observer1 got profile: Agent 0 Updating profile from some place Observer1 got profile: Agent 1 Updating profile from some place Observer1 got profile: Agent 2 Updating profile from some place Observer1 got profile: expired Observer1 got profile: David Observer2 got profile: David Updating profile from some place Observer1 got profile: Agent 4 Observer2 got profile: Agent 4 */ } }