Grokking RxJava, Part 3: Reactive with Benefits
In part 1, I went over the basic structure of RxJava. In part 2, I showed you how powerful operators could be. But maybe you're still not sold; there isn't quite enough there yet to convince you. Here's some of the other benefits that come along with the RxJava framework which should seal the deal.
Error Handling
Up until this point, we've largely been ignoring onComplete()
and onError()
. They mark when an Observable
is going to stop emitting items and the reason for why (either a successful completion, or an unrecoverable error).
Our original Subscriber
had the capability to listen to onComplete()
and onError()
. Let's actually do something with them:
Observable.just("Hello, world!")
.map(s -> potentialException(s))
.map(s -> anotherPotentialException(s))
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { System.out.println("Completed!"); }
@Override
public void onError(Throwable e) { System.out.println("Ouch!"); }
});
Let's say potentialException()
and anotherPotentialException()
both have the possibility of throwing Exceptions
. Every Observable
ends with either a single call to onCompleted()
or onError()
. As such, the output of the program will either be a String followed by "Completed!" or it will just be "Ouch!" (because an Exception
is thrown).
There's a few takeaways from this pattern:
-
onError()
is called if anException
is thrown at any time.This makes error handling much simpler. I can just handle every error at the end in a single function.
-
The operators don't have to handle the
Exception
.You can leave it up to the
Subscriber
to determine how to handle issues with any part of theObservable
chain becauseExceptions
skip ahead toonError()
. -
You know when the
Subscriber
has finished receiving items.Knowing when a task is done helps the flow of your code. (Though it is possible that an
Observable
may never complete.)
I find this pattern a lot easier than traditional error handling. With callbacks, you have to handle errors in each callback. Not only does that lead to repetitious code, but it also means that each callback must know how to handle errors, meaning your callback code is tightly coupled to the caller.
With RxJava's pattern, your Observable
doesn't even have to know what to do with errors! Nor do any of your operators have to handle error states - they'll be skipped in cases of critical failure. You can leave all your error handling to the Subscriber
.
Schedulers
You've got an Android app that makes a network request. That could take a long time, so you load it in another thread. Suddenly, you've got problems!
Multi-threaded Android applications are difficult because you have to make sure to run the right code on the right thread; mess up and your app can crash. The classic exception occurs when you try to modify a View off of the main thread.
In RxJava, you can tell your Observable
code which thread to run on using subscribeOn()
, and which thread your Subscriber
should run on using observeOn()
:
myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
How simple is that? Everything that runs before my Subscriber
runs on an I/O thread. Then in the end, my View manipulation happens on the main thread1.
The great part about this is that I can attach subscribeOn()
and observeOn()
to any Observable
! They're just operators! I don't have to worry about what the Observable
or its previous operators are doing; I can just stick this at the end for easy threading2.
With an AsyncTask
or the like, I have to design my code around which parts of the code I want to run concurrently. With RxJava, my code stays the same - it's just got a touch of concurrency added on.
Subscriptions
There's something I've been hiding from you. When you call Observable.subscribe()
, it returns a Subscription
. This represents the link between your Observable
and your Subscriber
:
Subscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));
You can use this Subscription
to sever the link later on:
subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Outputs "Unsubscribed=true"
What's nice about how RxJava handles unsubscribing is that it stops the chain. If you've got a complex chain of operators, using unsubscribe
will terminate wherever it is currently executing code3. No unnecessary work needs to be done!
Conclusion
Keep in mind that these articles are an introduction to RxJava. There's a lot more to learn than what I presented and it's not all sunshine and daisies (for example, read up on backpressure). Nor would I use reactive code for everything - I reserve it for the more complex parts of the code that I want to break into simpler logic.
Originally, I had planned for this post to be the conclusion of the series, but a common request has been for some practical RxJava examples in Android, so you can now continue onwards to part 4. I hope that this introduction is enough to get you started on a fun framework. If you want to learn more, I suggest reading the official RxJava wiki. And remember: the infinite is possible.
Many thanks to all the people who took the time to proofread these articles: Matthias Käppler, Matthew Wear, Ulysses Popple, Hamid Palo and Joel Drotos (worth the click for the beard alone).
1 This is one reason why I try to keep my Subscriber
as lightweight as possible; I want to minimize how much I block the main thread.
2 Deferring calls to observeOn()
and subscribeOn()
is good practice because it gives the Subscriber
more flexibility to handle processing as it wants. For instance, an Observable
might take a while, but if the Subscriber
is already in an I/O thread you wouldn't need to observe it on a new thread.
3 In part 1 I noted that Observable.just()
is a little more complex than just calling onNext()
and onComplete()
. The reason is subscriptions; it actually checks if the Subscriber
is still subscribed before calling onNext()
.