Skip to content

Instantly share code, notes, and snippets.

@sddamico
Last active April 29, 2018 04:26
Show Gist options
  • Select an option

  • Save sddamico/c45d7cdabc41e663bea1 to your computer and use it in GitHub Desktop.

Select an option

Save sddamico/c45d7cdabc41e663bea1 to your computer and use it in GitHub Desktop.
Exponential Backoff Transformer
/**
* @param intervalMs The base interval to start backing off from. The function is: attemptNum^2 * intervalMs
* @param retryAttempts The max number of attempts to retry this task or -1 to try MAX_INT times,
*/
public static <T> Observable.Transformer<T, T> backoff(final long intervalMs, final int retryAttempts) {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(final Observable<T> observable) {
return observable.retryWhen(
retryFunc(intervalMs, retryAttempts),
Schedulers.immediate()
);
}
};
}
private static Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> retryFunc(final long ms, final int attempts) {
return new Func1<Observable<? extends Throwable>, Observable<Long>>() {
@Override
public Observable<Long> call(Observable<? extends Throwable> observable) {
// zip our number of retries to the incoming errors so that we only produce retries
// when there's been an error
return observable.zipWith(
Observable.range(1, attempts > 0 ? attempts : Integer.MAX_VALUE),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attemptNumber) {
return attemptNumber;
}
})
// flatMap the int attempt number to a timer that will wait the specified delay
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(final Integer integer) {
long newInterval = ms * ((long) integer * (long) integer);
if (newInterval < 0) {
newInterval = Long.MAX_VALUE;
}
// use Schedulers#immediate() to keep on same thread
return Observable.timer(newInterval, TimeUnit.MILLISECONDS, Schedulers.immediate());
}
});
}
};
}
@plastiv
Copy link

plastiv commented Jul 3, 2015

I understand that overwriting default scheduler with immediate() would save switching between threads and keeping a main observable worker thread from releasing back to the pool. Is it a correct assumption?

@sddamico
Copy link
Author

sddamico commented Jul 7, 2015

@plastiv sorry for missing this comment, but yes you're correct.

@sddamico
Copy link
Author

sddamico commented Jul 7, 2015

OP updated with TimeUnit to be better aligned with mainline api's that work with time.

@DavidMihola
Copy link

Just to make sure I understanding/using this correctly: This does never actually forward the last error, right? That is, after the last retry I will get an onCompleted() in my Subscriber and not an onError?

@ltn614
Copy link

ltn614 commented Nov 25, 2015

I think there is a problem in your code, since you use Schedulers.immediate() in the backoff, you shouldn't use
it again in retryFunc's Observable.timer, it will cause the other operation in retryFunc block, am I right?

@roman-yu
Copy link

@DavidMihola, I think you are right, I have encountered exactly the problem you mentioned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment