Skip to content

Instantly share code, notes, and snippets.

@dima716
Forked from shlomiassaf/demo.ts
Created May 7, 2018 09:22
Show Gist options
  • Select an option

  • Save dima716/4ce568083d84e29773171c48dda823fd to your computer and use it in GitHub Desktop.

Select an option

Save dima716/4ce568083d84e29773171c48dda823fd to your computer and use it in GitHub Desktop.
RxJS cached polling operator with cache invalidation support in 10 LOC.
const reset$ = new Subject();
// the API call we want to poll
const poller$ = myApi.getData()
.pipe(poll(reset$, 30000)); // 30 secs
// no polling yet...
const sub1 = poller$.subsbribe(data => console.log('POLL TICK 1') );
const sub2 = poller$.subsbribe(data => console.log(' & 2') );
// console: POLL TICK 1 & 2
// 30 secs passed ...
// console: POLL TICK 1 & 2
// 5 secs passed...
reset$.next();
// console: POLL TICK 1 & 2
// 30 secs passed ...
// console: POLL TICK 1 & 2
// 5 secs passed...
reset$.next();
// no console output, call cancelled.
reset$.next();
// console: POLL TICK 1 & 2
sub1.unsubscribe();
// 30 secs passed ...
// console: & 2
sub2.unsubscribe();
// NO MORE PULLING
export const poll = (reset: Observable<any>, interval = 5000) => {
return (source: Observable<any>) => source.pipe(
delay(interval),
repeat(),
retryWhen( errors => errors.pipe(delay(interval)) ),
takeUntil(reset),
repeat()
publishReplay(1), // a weak shareReplay()
refCount()
);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment