Grokking RxJava, Part 2: Operator, Operator

In part 1 I went over the basic structure of RxJava, as well as introducing you to the map() operator. However, I can understand if you're still not compelled to use RxJava - you don't have much to work with yet. But that will change quickly - a big part of the power of RxJava is in all of the operators included in the framework.

Let's go through an example to introduce you to more operators.

The Setup

Suppose I have this method available:

// Returns a List of website URLs based on a text search
Observable<List<String>> query(String text); 

I want to make a robust system for searching text and displaying the results. Given what we know from the last article, this is what one might come up with:

query("Hello, world!")
    .subscribe(urls -> {
        for (String url : urls) {
            System.out.println(url);
        }
    });

This answer is highly unsatisfactory because I lose the ability to transform the data stream. If I wanted to modify each URL, I'd have to do it all in the Subscriber. We're tossing all our cool map() tricks out the window!

I could create a map() from urls -> urls, but then every map() call would have a for-each loop inside of it - ouch.

A Glimmer of Hope

There is a method, Observable.from(), that takes a collection of items and emits each them one at a time:

Observable.from("url1", "url2", "url3")
    .subscribe(url -> System.out.println(url));

That looks like it could help, let's see what happens:

query("Hello, world!")
    .subscribe(urls -> {
        Observable.from(urls)
            .subscribe(url -> System.out.println(url));
    });

I've gotten rid of the for-each loop, but the resulting code is a mess. I've got multiple, nested subscriptions now! Besides being ugly and hard to modify, it also breaks some critical as-yet undiscovered features of RxJava1. Ugh.

A Better Way

Hold your breath as you view your savior: flatMap().

Observable.flatMap() takes the emissions of one Observable and returns the emissions of another Observable to take its place. It's the ol' switcheroo: you thought you were getting one stream of items but instead you get another. Here's how it solves this problem:

query("Hello, world!")
    .flatMap(new Func1<List<String>, Observable<String>>() {
        @Override
        public Observable<String> call(List<String> urls) {
            return Observable.from(urls);
        }
    })
    .subscribe(url -> System.out.println(url));

I'm showing the full function just so you can see exactly what happened, but simplified with lambdas it looks awesome:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .subscribe(url -> System.out.println(url));

flatMap() is weird, right? Why is it returning another Observable? The key concept here is that the new Observable returned is what the Subscriber sees. It doesn't receive a List<String> - it gets a series of individual Strings as returned by Observable.from().

For the record, this part was the hardest for me to understand, but once I had the "aha" moment a lot of RxJava clicked.

It Gets Even Better

I can't emphasize this idea enough: flatMap() can return any Observable it wants.

Suppose I've got a second method available:

// Returns the title of a website, or null if 404
Observable<String> getTitle(String URL);

Instead of printing the URLs, now I want to print the title of each website received. But there's a few issues: my method only works on a single URL at a time, and it doesn't return a String, it returns an Observable that emits the String.

With flatMap(), solving this problem is easy; after splitting the list of URLs into individual items, I can use getTitle() in flatMap() for each url before it reaches the Subscriber:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String url) {
            return getTitle(url);
        }
    })
    .subscribe(title -> System.out.println(title));

And once more, simplified via lambdas:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .subscribe(title -> System.out.println(title));

Crazy, right? I'm composing multiple independent methods returning Observables together! How cool is that!

Not only that, but notice how I'm combining two API calls into a single chain. We could do it for any number of API calls. You know how much of a pain in the ass it is to keep all your API calls synced, having to link their callbacks together before presenting the data? We've skipped the trip to callback hell; all that same logic is now encased in this short reactive call2.

Operators Galore

We've only looked at two operators so far, but there are so many more! How else can we improve our code?

getTitle() returns null if the URL 404s. We don't want to output "null"; it turns out we can filter them out!

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .subscribe(title -> System.out.println(title));

filter() emits the same item it received, but only if it passes the boolean check.

And now we want to only show 5 results at most:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .subscribe(title -> System.out.println(title));

take() emits, at most, the number of items specified. (If there are fewer than 5 titles it'll just stop early.)

Now we want to save each title to disk along the way:

query("Hello, world!")
    .flatMap(urls -> Observable.from(urls))
    .flatMap(url -> getTitle(url))
    .filter(title -> title != null)
    .take(5)
    .doOnNext(title -> saveTitle(title))
    .subscribe(title -> System.out.println(title));

doOnNext() allows us to add extra behavior each time an item is emitted, in this case saving the title.

Look at how easy it is to manipulate the stream of data. You can keep adding more and more ingredients to your recipe and not mess anything up.

RxJava comes with a ton of operators. It is intimidating how many operators there are, but it's worth reviewing so you know what's available. It will take a while to internalize the operators but you'll have true power at your fingertips once you do.

On top of all that's provided, you can even write your own custom operators! That's outside the scope of this article, but basically, if you can think it, you can do it3.

So What?

Alright, so you're a hard sell. You're a skeptic. Why should you care about all these operators?

Key idea #3: Operators let you do anything to the stream of data.

The only limit is yourself.

You can setup complex logic using nothing but chains of simple operators. It breaks down your code into composable bits and pieces. That's functional reactive programming. The more you use it, the more it changes the way you think about programs.

Plus, think about how simple our data was to consume once transformed. By the end of our example we were doing two API calls, manipulating the data, then saving it to disk. But our Subscriber doesn't know that; it just thinks it's consuming a simple Observable<String>. Encapsulation makes coding easier!

In part 3 we'll cover some of the other cool features of RxJava that aren't as directly involved with manipulating data, like error handling and concurrency.

Continue onwards to part 3


1 The way RxJava does error handling, threading, and subscription cancellation wouldn't work at all with this code. I'll get to that in part 3.

2 You may be wondering about the other part of callback hell that is error handling. I'll be addressing that in part 3.

3 If you want to implement your own operators, check out this wiki page, though some implementation details won't make sense until part 3.

comments powered by Disqus