Grokking RxJava, Part 1: The Basics

RxJava is the new hotness amongst Android developers these days. The only problem is that it can be difficult to approach initially. Functional Reactive Programming is hard when you come from an imperative world, but once you understand it, it's so awesome!

I'm here to try to give you a flavor of RxJava. The goal of this three four-part series is to get your foot in the door. I'm not going to try to explain everything (nor could I). I just want you to become interested in RxJava and how it works.

The Basics

The basic building blocks of reactive code are Observables and Subscribers1. An Observable emits items; a Subscriber consumes those items.

There is a pattern to how items are emitted. An Observable may emit any number of items (including zero items), then it terminates either by successfully completing, or due to an error. For each Subscriber it has, an Observable calls Subscriber.onNext() any number of times, followed by either Subscriber.onComplete() or Subscriber.onError().

This looks a lot like your standard observer pattern, but it differs in one key way - Observables often don't start emitting items until someone explicitly subscribes to them2. In other words: if no one is there to listen, the tree won't fall in the forest.

Hello, World!

Let's see this framework in action with a concrete example. First, let's create a basic Observable:

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> sub) {
            sub.onNext("Hello, world!");
            sub.onCompleted();
        }
    }
);

Our Observable emits "Hello, world!" then completes. Now let's create a Subscriber to consume the data:

Subscriber<String> mySubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) { System.out.println(s); }

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }
};

All this does is print each String emitted by the Observable.

Now that we've got myObservable and mySubscriber we can hook them up to each other using subscribe():

myObservable.subscribe(mySubscriber);
// Outputs "Hello, world!"

When the subscription is made, myObservable calls the subscriber's onNext() and onComplete() methods. As a result, mySubscriber outputs "Hello, world!" then terminates.

Simpler Code

That's a lot of boilerplate code just to say "Hello, world!" That's because I took the verbose route so you could see exactly what's happening. There are lots of shortcuts provided in RxJava to make coding easier.

First, let's simplify our Observable. RxJava has multiple built-in Observable creation methods for common tasks. In this case, Observable.just() emits a single item then completes, just like our code above3:

Observable<String> myObservable =
    Observable.just("Hello, world!");

Next, let's handle that unnecessarily verbose Subscriber. We don't care about onCompleted() nor onError(), so instead we can use a simpler class to define what to do during onNext():

Action1<String> onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
};

Actions can define each part of a Subscriber. Observable.subscribe() can handle one, two or three Action parameters that take the place of onNext(), onError(), and onComplete(). Replicating our Subscriber from before looks like this:

myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);

However, we only need the first parameter, because we're ignoring onError() and onComplete():

myObservable.subscribe(onNextAction);
// Outputs "Hello, world!"

Now, let's get rid of those variables by just chaining the method calls together:

Observable.just("Hello, world!")
    .subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
              System.out.println(s);
        }
    });

Finally, let's use Java 8 lambdas to get rid of that ugly Action1 code.

Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s));

If you're on Android (and thus can't use Java 8), I highly recommend using retrolambda; it will cut down on the verbosity of your code immensely.

Transformation

Let's spice things up.

Suppose I want to append my signature to the "Hello, world!" output. One possibility would be to change the Observable:

Observable.just("Hello, world! -Dan")
    .subscribe(s -> System.out.println(s));

This works if you have control over your Observable, but there's no guarantee that will be the case - what if you're using someone else's library? Another potential problem: what if I use my Observable in multiple places but only sometimes want to add the signature?

How about we try modifying our Subscriber instead:

Observable.just("Hello, world!")
    .subscribe(s -> System.out.println(s + " -Dan"));

This answer is also unsatisfactory, but for different reasons: I want my Subscribers to be as lightweight as possible because I might be running them on the main thread. On a more conceptual level, Subscribers are supposed to be the thing that reacts, not the thing that mutates.

Wouldn't it be cool if I could transform "Hello, world!" with some intermediary step?

Introducing Operators

Here's how we're going to solve the item transformation problems: with operators. Operators can be used in between the source Observable and the ultimate Subscriber to manipulate emitted items. RxJava comes with a huge collection of operators, but its best to focus on just a handful at first.

For this situation, the map() operator can be used to transform one emitted item into another:

Observable.just("Hello, world!")
    .map(new Func1<String, String>() {
        @Override
        public String call(String s) {
            return s + " -Dan";
        }
    })
    .subscribe(s -> System.out.println(s));

Again, we can simplify this by using lambdas:

Observable.just("Hello, world!")
    .map(s -> s + " -Dan")
    .subscribe(s -> System.out.println(s));

Pretty cool, eh? Our map() operator is basically an Observable that transforms an item. We can chain as many map() calls as we want together, polishing the data into a perfect, consumable form for our end Subscriber.

More on map()

Here's an interesting aspect of map(): it does not have to emit items of the same type as the source Observable!

Suppose my Subscriber is not interested in outputting the original text, but instead wants to output the hash of the text:

Observable.just("Hello, world!")
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return s.hashCode();
        }
    })
    .subscribe(i -> System.out.println(Integer.toString(i)));

Interesting - we started with a String but our Subscriber receives an Integer. Again, we can use lambdas to shorten this code:

Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .subscribe(i -> System.out.println(Integer.toString(i)));

Like I said before, we want our Subscriber to do as little as possible. Let's throw in another map() to convert our hash back into a String:

Observable.just("Hello, world!")
    .map(s -> s.hashCode())
    .map(i -> Integer.toString(i))
    .subscribe(s -> System.out.println(s));

Would you look at that - our Observable and Subscriber are back to their original code! We just added some transformational steps in between. We could even add my signature transformation back in as well:

Observable.just("Hello, world!")
    .map(s -> s + " -Dan")
    .map(s -> s.hashCode())
    .map(i -> Integer.toString(i))
    .subscribe(s -> System.out.println(s));

So What?

At this point you might be thinking "that's a lot of fancy footwork for some simple code." True enough; it's a simple example. But there's two ideas you should take away:

Key idea #1: Observable and Subscriber can do anything.

Let your imagination run wild! Anything is possible.

Your Observable could be a database query, the Subscriber taking the results and displaying them on the screen. Your Observable could be a click on the screen, the Subscriber reacting to it. Your Observable could be a stream of bytes read from the internet, the Subscriber could write it to the disk.

It's a general framework that can handle just about any problem.

Key idea #2: The Observable and Subscriber are independent of the transformational steps in between them.

I can stick as many map() calls as I want in between the original source Observable and its ultimate Subscriber. The system is highly composable: it is easy to manipulate the data. As long as the operators work with the correct input/output data I could make a chain that goes on forever4.

Combine our two key ideas and you can see a system with a lot of potential. At this point, though, we only have a single operator, map(), which severely limits our capabilities. In part 2 we'll take a dip into the large pool of operators available to you when using RxJava.

Continue onwards to part 2


1 The smallest building block is actually an Observer, but in practice you are most often using Subscriber because that's how you hook up to Observables.

2 The term used is "hot" vs. "cold" Observables. A hot Observable emits items all the time, even when no one is listening. A cold Observable only emits items when it has a subscriber (and is what I'm using in all my examples). This distinction isn't that important to initially learning RxJava.

3 Strictly speaking, Observable.just() isn't exactly the same as our initial code, but I won't get to why until part 3.

4 Okay, not really, since I'll hit the bounds of the machine at some point, but you get the idea.

comments powered by Disqus