RxJava's repeatWhen and retryWhen, explained

repeatWhen and retryWhen are fairly baffling at first glance. For starters, they are serious contenders for "most confusing marble diagrams":

They're useful operators: they allow you to conditionally resubscribe to Observables that have terminated. I recently studied how they worked and I want to try to explain them (since it took me a while to grasp).

Repeat vs. Retry

First of all, what is the difference between their simpler counterparts: repeat() and retry? This part is simple: the difference is which terminal event causes a resubscription.

repeat() resubscribes when it receives onCompleted().

retry() resubscribes when it receives onError().

However, these simple versions can leave something to be desired. What if you want to delay resubscription by a couple seconds? Or examine the error to determine if you should resubscribe? That's where repeatWhen and retryWhen step in; they let you provide custom logic for the retries.

Notification Handler

You provide the retry logic through a function known as the notificationHandler. Here's the method signature for retryWhen:

retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)  

That's a mouthful! I found this signature hard to parse since it's a mess of generics.

Simplified, it consists of three parts:

  1. The Func1 is a factory for providing your retry logic.
  2. The input is an Observable<Throwable>.
  3. The output is an Observable<?>.

Let's look at the last part first. The emissions of the Observable<?> returned determines whether or not resubscription happens. If it emits onCompleted or onError then it doesn't resubscribe. But if it emits onNext then it does (regardless of what's actually in the onNext). That's why it's using the wildcard for its generic type: it's just the notification (next, error or completed) that matters.

The input is an Observable<Throwable> that emits anytime the source calls onError(Throwable). In other words, it triggers each time you need to decide whether to retry or not.

The factory Func1 is called on subscription to setup the retry logic. That way, when onError is called, you've already defined how to handle it.

Here's an example wherein we resubscribe to the source if the Throwable is an IOException, but otherwise do not:

source.retryWhen(errors -> errors.flatMap(error -> {  
    // For IOExceptions, we  retry
    if (error instanceof IOException) {
      return Observable.just(null);
    }

    // For anything else, don't retry
    return Observable.error(error);
  })
)

Each error is flatmapped so that we can either return onNext(null) (to trigger a resubscription) or onError(error) (to avoid resubscription).

Observations

Here's some key points about repeatWhen and retryWhen which you should keep in mind.

  • repeatWhen is identical to retryWhen, only it responds to onCompleted instead of onError. The input is Observable<Void>, since onCompleted has no type.

  • The notificationHandler (i.e. Func1) is only called once per subscription. This makes sense as you are given an Observable<Throwable>, which can emit any number of errors.

  • The output Observable has to use the input Observable as its source. You must react to the Observable<Throwable> and emit based on it; you can't just return a generic stream.

    In other words, you can't do something like retryWhen(errors -> Observable.just(null)). Not only will it not work, it completely breaks your sequence. You need to, at the very least, return the input, like retryWhen(errors -> errors) (which, by the way, duplicates the logic of the simpler retry()).

  • The Observable input only triggers on terminal events (onCompleted for repeatWhen / onError for retryWhen). It doesn't receive any of the onNext notifications from the source, so you can't examine the emitted data to determine if you should resubscribe. If you want to do that, you have to add an operator like takeUntil() to cut off the stream.

Uses

Now that you (vaguely) understand retryWhen and repeatWhen, what sort of logic can you stick in the notificationHandler?

Poll for data periodically using repeatWhen + delay:

source.repeatWhen(completed -> completed.delay(5, TimeUnit.SECONDS))  

The source isn't resubscribed until the notificationHandler emits onNext(). Since delay waits some time before emitting onNext(), this has the effect of delaying resubscription so you can avoid constantly polling data.

Alternatively, delay resubscription with flatMap + timer:

source.retryWhen(errors -> errors.flatMap(error -> Observable.timer(5, TimeUnit.SECONDS)))  

This alternative is useful when combined with other logic, such as...

Resubscribe a limited number of times with zip + range:

source.retryWhen(errors -> errors.zipWith(Observable.range(1, 3), (n, i) -> i))  

The end result is that each error is paired with one of the outputs of range, like so:

zip(error1, 1) -> onNext(1)  <-- Resubscribe  
zip(error2, 2) -> onNext(2)  <-- Resubscribe  
zip(error3, 3) -> onNext(3)  <-- Resubscribe  
onCompleted()                <-- No resubscription  

Since range(1, 3) runs out of numbers on the fourth error, it calls onCompleted(), which causes the entire zip to complete. This prevents further retries.

Combine the above for limited retries with variable delays:

source.retryWhen(errors ->  
  errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);

flatMap + timer is preferable over delay in this case because it lets us modify the delay by the number of retries. The above retries three times and delays each retry by 5 ^ retryCount, giving you exponential backoff with just a handful of operators!

comments powered by Disqus