RxJava's repeatWhen and retryWhen, explained
repeatWhen
and retryWhen
are fairly baffling at first glance. For starters, they are serious contenders for "most confusing marble diagrams":
They're useful operators: they allow you to conditionally resubscribe to Observables
that have terminated. I recently studied how they worked and I want to try to explain them (since it took me a while to grasp).
Repeat vs. Retry
First of all, what is the difference between their simpler counterparts: repeat()
and retry
? This part is simple: the difference is which terminal event causes a resubscription.
repeat()
resubscribes when it receives onCompleted()
.
retry()
resubscribes when it receives onError()
.
However, these simple versions can leave something to be desired. What if you want to delay resubscription by a couple seconds? Or examine the error to determine if you should resubscribe? That's where repeatWhen
and retryWhen
step in; they let you provide custom logic for the retries.
Notification Handler
You provide the retry logic through a function known as the notificationHandler
. Here's the method signature for retryWhen
:
retryWhen(Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
That's a mouthful! I found this signature hard to parse since it's a mess of generics.
Simplified, it consists of three parts:
- The
Func1
is a factory for providing your retry logic. - The input is an
Observable<Throwable>
. - The output is an
Observable<?>
.
Let's look at the last part first. The emissions of the Observable<?>
returned determines whether or not resubscription happens. If it emits onCompleted
or onError
then it doesn't resubscribe. But if it emits onNext
then it does (regardless of what's actually in the onNext
). That's why it's using the wildcard for its generic type: it's just the notification (next, error or completed) that matters.
The input is an Observable<Throwable>
that emits anytime the source calls onError(Throwable)
. In other words, it triggers each time you need to decide whether to retry or not.
The factory Func1
is called on subscription to setup the retry logic. That way, when onError
is called, you've already defined how to handle it.
Here's an example wherein we resubscribe to the source if the Throwable
is an IOException
, but otherwise do not:
source.retryWhen(errors -> errors.flatMap(error -> {
// For IOExceptions, we retry
if (error instanceof IOException) {
return Observable.just(null);
}
// For anything else, don't retry
return Observable.error(error);
})
)
Each error is flatmapped so that we can either return onNext(null)
(to trigger a resubscription) or onError(error)
(to avoid resubscription).
Observations
Here's some key points about repeatWhen
and retryWhen
which you should keep in mind.
-
repeatWhen
is identical toretryWhen
, only it responds toonCompleted
instead ofonError
. The input isObservable<Void>
, sinceonCompleted
has no type. -
The
notificationHandler
(i.e.Func1
) is only called once per subscription. This makes sense as you are given anObservable<Throwable>
, which can emit any number of errors. -
The output
Observable
has to use the inputObservable
as its source. You must react to theObservable<Throwable>
and emit based on it; you can't just return a generic stream.In other words, you can't do something like
retryWhen(errors -> Observable.just(null))
. Not only will it not work, it completely breaks your sequence. You need to, at the very least, return the input, likeretryWhen(errors -> errors)
(which, by the way, duplicates the logic of the simplerretry()
). -
The
Observable
input only triggers on terminal events (onCompleted
forrepeatWhen
/onError
forretryWhen
). It doesn't receive any of theonNext
notifications from the source, so you can't examine the emitted data to determine if you should resubscribe. If you want to do that, you have to add an operator liketakeUntil()
to cut off the stream.
Uses
Now that you (vaguely) understand retryWhen
and repeatWhen
, what sort of logic can you stick in the notificationHandler
?
Poll for data periodically using repeatWhen
+ delay
:
source.repeatWhen(completed -> completed.delay(5, TimeUnit.SECONDS))
The source isn't resubscribed until the notificationHandler
emits onNext()
. Since delay
waits some time before emitting onNext()
, this has the effect of delaying resubscription so you can avoid constantly polling data.
Alternatively, delay resubscription with flatMap
+ timer
:
source.retryWhen(errors -> errors.flatMap(error -> Observable.timer(5, TimeUnit.SECONDS)))
This alternative is useful when combined with other logic, such as...
Resubscribe a limited number of times with zip
+ range
:
source.retryWhen(errors -> errors.zipWith(Observable.range(1, 3), (n, i) -> i))
The end result is that each error is paired with one of the outputs of range
, like so:
zip(error1, 1) -> onNext(1) <-- Resubscribe
zip(error2, 2) -> onNext(2) <-- Resubscribe
zip(error3, 3) -> onNext(3) <-- Resubscribe
onCompleted() <-- No resubscription
Since range(1, 3)
runs out of numbers on the fourth error, it calls onCompleted()
, which causes the entire zip
to complete. This prevents further retries.
Combine the above for limited retries with variable delays:
source.retryWhen(errors ->
errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);
flatMap
+ timer
is preferable over delay
in this case because it lets us modify the delay by the number of retries. The above retries three times and delays each retry by 5 ^ retryCount
, giving you exponential backoff with just a handful of operators!