Error handling in RxJava

RxJava makes error handling a snap. Just use onError, right?

That's fine and dandy if you're the subscriber, but what about exceptions that occur inside your operators? How do you ensure that an Exception in, say, flatMap() gets passed along to onError? And what if you don't want the exception to terminate the sequence?

Unchecked Exceptions

For the most part, unchecked exceptions are automatically forwarded to onError. For example, the following code prints out "Error!":

Observable.just("Hello!")  
  .map(input -> { throw new RuntimeException(); })
  .subscribe(
    System.out::println,
    error -> System.out.println("Error!")
  );

Fatal exceptions are the... uh... exception. These are exceptions so severe that RxJava itself cannot keep operating, such as StackOverflowError. In these situations, the exception is re-thrown and the JVM is shut down. You can find a full list of fatal exceptions in Exceptions.throwIfFatal().

Checked Exceptions

Suppose you've got this method signature:

String transform(String input) throws IOException;  

What if we want to call this method in an operator? Even though RxJava has its own system for handling errors, checked exceptions still must be handled by your code. That means you have to add your own try-catch block.

In this example, we take checked exceptions and convert them to unchecked exceptions:

Observable.just("Hello!")  
  .map(input -> {
    try {
      return transform(input);
    } catch (Throwable t) {
      throw Exceptions.propagate(t);
    }
  })

I find it's easiest to deal with exceptions when they have the fewest wrappers around them. Exceptions.propagate() helps; it only wraps the exception if it's checked. Along those same lines, flatMap() (or any other operator that returns an Observable) can simply return Observable.error() instead of wrapping/throwing:

Observable.just("Hello!")  
  .flatMap(input -> {
    try {
      return Observable.just(transform(input));
    } catch (Throwable t) {
      return Observable.error(t);
    }
  })

You don't need to call onError in the case of a checked exception, either. You could always handle your checked exception but keep the sequence going. For example, you could return an error value from the map, which onNext knows how to handle.

Masking Exceptions

I want to clear up something that many RxJava beginners get wrong: onError is an extreme event that should be reserved for times when sequences cannot continue. It means that there was a problem in processing the current item such that no future processing of any items can occur.

It's the Rx version of try-catch: it skips all code between the exception and onError. Sometimes you want this behavior: suppose you're downloading data then saving it to the disk. If the download fails, you want to skip saving. But what about if you're, say, polling data from a source? If one poll fails you still want to continue polling. Or maybe, in the previous example, you want to save dummy data in the case that the download fails.

In other words: often times, instead of calling onError and skipping code, you actually want to call onNext with an error state. It's much easier to handle problems in onNext since you still get to run all your code and the stream isn't terminated.1

How do we cover up Exceptions instead of passing it along to onError? For that you have a couple options:

  • onErrorReturn(), which replaces onError with a single onNext(value) (followed by onCompleted(), which logically follows the final emission).
  • onErrorResumeNext(), which replaces the current stream with an entirely new Observable.

You might use it like this:

Observable.just("Request data...")  
  .map(this::dangerousOperation)
  .onErrorReturn(error -> "Empty result")

If dangerousOperation() throws an exception, it'll now return "Empty result" instead of calling onError.

It's worth noting that, when using these operators, the upstream Observables are still going to shut down! They've already seen a terminal event (onError); all that onError[Return|ResumeNext] does is replace the onError notification with a different sequence downstream.

For example, if you have this sequence:

Observable.interval(1, TimeUnit.SECONDS)  
  .map(input -> { throw new RuntimeException(); })
  .onErrorReturn(error -> "Uh oh")
  .subscribe(System.out::println);

You might expect the interval to continue emitting after the map throws an exception, but it doesn't! Only the downstream subscribers avoid onError. The code above would just print "Uh oh" once.

If you want to fix things upstream, then you can look into using retry(). It re-subscribes to the source when it receives onError, giving your sequence a second chance.

Here's a stream that fails every once in a while, but every time it does we call retry():

Observable.interval(1, TimeUnit.SECONDS)  
  .map(input -> {
    if (Math.random() < .5) {
      throw new RuntimeException();
    }
    return "Success " + input;
  })
  .retry()
  .subscribe(System.out::println);

One has to be careful when using retry() because it's just re-calling subscribe() when it encounters an error, which creates a fresh new subscription (and all that entails). You might expect the above sequence to output "Success 0", "Success 1", "Success 2", etc. but it actually restarts at 0 every time an error occurs. That's because interval() starts counting at 0 each time it gets a new subscriber.

Thanks to Erik Hellman and Jake Wharton for proofreading this article.


1 Using onError incorrectly is one of the core problems with Observables in Retrofit 1. Any non-200 responses resulted in onError which made composition difficult. This problem has been fixed in Retrofit 2; now you can use Observable<Response<Type>> or Observable<Result<Type>>, which allows you to handle non-200s in onNext.

comments powered by Disqus