Grokking RxJava, Part 4: Reactive Android

In parts 1, 2, and 3 I covered how RxJava works (in a general sense). But as an Android developer, how do you make it work for you? Here is some practical information for Android developers.

RxAndroid

RxAndroid is an extension to RxJava built just for Android. It includes special bindings that will make your life easier.

First, there's AndroidSchedulers which provides schedulers ready-made for Android's threading system. Need to run some code on the UI thread? No problem - just use AndroidSchedulers.mainThread():

retrofitService.getImage(url)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

If you've got your own Handler, you can create a scheduler linked to it with HandlerThreadScheduler1.

Next we have AndroidObservable which provides more facilities for working within the Android lifecycle. There is bindActivity() and bindFragment() which, in addition to automatically using AndroidSchedulers.mainThread() for observing, will also stop emitting items when your Activity or Fragment is finishing (so you don't accidentally try to change state after it is valid to do so).

AndroidObservable.bindActivity(this, retrofitService.getImage(url))
    .subscribeOn(Schedulers.io())
    .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

I also like AndroidObservable.fromBroadcast(), which allows you to create an Observable that works like a BroadcastReceiver. Here's a way to be notified whenever network connectivity changes:

IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
AndroidObservable.fromBroadcast(context, filter)
    .subscribe(intent -> handleConnectivityChange(intent));

Finally, there is ViewObservable, which adds a couple bindings for Views. There's ViewObservable.clicks() if you want to get an event each time a View is clicked, or ViewObservable.text() to observe whenever a TextView changes its content.

ViewObservable.clicks(mCardNameEditText, false)
    .subscribe(view -> handleClick(view));

Retrofit

There's one notable library that supports RxJava: Retrofit, a popular REST client for Android. Normally when you define an asynchronous method you add a Callback:

@GET("/user/{id}/photo")
void getUserPhoto(@Path("id") int id, Callback<Photo> cb);

With RxJava installed, you can have it return an Observable instead:

@GET("/user/{id}/photo")
Observable<Photo> getUserPhoto(@Path("id") int id);

Now you can hook into the Observable any way you want; not only will you get your data but you can transform it, too!

Retrofit support for Observable also makes it easy to combine multiple REST calls together. For example, suppose we have one call that gets the photo and a second that gets the metadata. We can zip the results together:

Observable.zip(
    service.getUserPhoto(id),
    service.getPhotoMetadata(id),
    (photo, metadata) -> createPhotoWithData(photo, metadata))
    .subscribe(photoWithData -> showPhoto(photoWithData));

I showed a similar example to this in part 2 (using flatMap()). I wanted to demonstrate how easy it is to combine multiple REST calls into one with RxJava + Retrofit.

Old, Slow Code

It's neat that Retrofit can return Observables, but what if you've got another library that doesn't support it? Or some internal code you want to convert to Observables? Basically, how do you connect old code to new without rewriting everything?

Observable.just() and Observable.from() will suffice for creating an Observable from older code most of the time:

private Object oldMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.just(oldMethod());
}

That works well if oldMethod() is fast, but what if it's slow? It'll block the thread because you're calling oldMethod() before passing it to Observable.just().

To get around that problem, here's a trick I use all the time - wrapping the slower part with defer():

private Object slowBlockingMethod() { ... }

public Observable<Object> newMethod() {
    return Observable.defer(() -> Observable.just(slowBlockingMethod()));
}

Now, the Observable returned won't call slowBlockingMethod() until you subscribe to it.

Lifecycle

I saved the hardest for last. How do you handle the Activity lifecycle? There are two issues that crop up over and over again:

  1. Continuing a Subscription during a configuration change (e.g. rotation).

    Suppose you make REST call with Retrofit and then want to display the outcome in a ListView. What if the user rotates the screen? You want to continue the same request, but how?

  2. Memory leaks caused by Observables which retain a copy of the Context.

    This problem is caused by creating a subscription that retains the Context somehow, which is not difficult when you're interacting with Views! If Observable doesn't complete on time, you may end up retaining a lot of extra memory.

Unfortunately, there are no silver bullets for either problem, but there are some guidelines you can follow to make your life easier.


The first problem can be solved with some of RxJava's built-in caching mechanisms, so that you can unsubscribe/resubscribe to the same Observable without it duplicating its work. In particular, cache() (or replay()) will continue the underlying request (even if you unsubscribe). That means you can resume with a new subscription after Activity recreation:

Observable<Photo> request = service.getUserPhoto(id).cache();
Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));

// ...When the Activity is being recreated...
sub.unsubscribe();

// ...Once the Activity is recreated...
request.subscribe(photo -> handleUserPhoto(photo));

Note that we're using the same cached request in both cases; that way the underlying call only happens once. Where you store request I leave up to you, but like all lifecycle solutions, it must be stored somewhere outside the lifecycle (a retained fragment, a singleton, etc).


The second problem can be solved by properly unsubscribing from your subscriptions in accordance with the lifecycle. It's a common pattern to use a CompositeSubscription to hold all of your Subscriptions, and then unsubscribe all at once in onDestroy() or onDestroyView():

private CompositeSubscription mCompositeSubscription
    = new CompositeSubscription();

private void doSomething() {
    mCompositeSubscription.add(
		AndroidObservable.bindActivity(this, Observable.just("Hello, World!"))
        .subscribe(s -> System.out.println(s)));
}

@Override
protected void onDestroy() {
    super.onDestroy();
    
    mCompositeSubscription.unsubscribe();
}

For bonus points you can create a root Activity/Fragment that comes with a CompositeSubscription that you can add to and is later automatically unsubscribed.

A warning! Once you call CompositeSubscription.unsubscribe() the object is unusable, as it will automatically unsubscribe anything you add to it afterwards! You must create a new CompositeSubscription as a replacement if you plan on re-using this pattern later.

Solutions to both problems involve adding code; I'm hoping that someday a genius comes by and figures out how to solve these problems without all the boilerplate.

Conclusion?

There isn't one yet for Android. RxJava is still rather new and adoption of it on Android is even newer. People are still figuring this stuff out; RxAndroid is in active development and there aren't any great samples out there yet. I bet that a year from now some of the advice I've given here will be considered quaint.

In the meantime, I find that RxJava not only makes coding easier but a bit more fun. If you're still not convinced, find me sometime and we'll talk about it over a beer.

Thanks again to Matthias Kay for proofreading this article. Join him in making RxAndroid awesome!


1 AndroidSchedulers.mainThread() uses a HandlerThreadScheduler internally, in fact.