Grokking RxJava, Part 1: The Basics
RxJava is the new hotness amongst Android developers these days. The only problem is that it can be difficult to approach initially. Functional Reactive Programming is hard when you come from an imperative world, but once you understand it, it's so awesome!
I'm here to try to give you a flavor of RxJava. The goal of this three four-part series is to get your foot in the door. I'm not going to try to explain everything (nor could I). I just want you to become interested in RxJava and how it works.
The Basics
The basic building blocks of reactive code are Observables
and Subscribers
1. An Observable
emits items; a Subscriber
consumes those items.
There is a pattern to how items are emitted. An Observable
may emit any number of items (including zero items), then it terminates either by successfully completing, or due to an error. For each Subscriber
it has, an Observable
calls Subscriber.onNext()
any number of times, followed by either Subscriber.onComplete()
or Subscriber.onError()
.
This looks a lot like your standard observer pattern, but it differs in one key way - Observables
often don't start emitting items until someone explicitly subscribes to them2. In other words: if no one is there to listen, the tree won't fall in the forest.
Hello, World!
Let's see this framework in action with a concrete example. First, let's create a basic Observable
:
Observable<String> myObservable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
}
);
Our Observable
emits "Hello, world!" then completes. Now let's create a Subscriber
to consume the data:
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) { }
};
All this does is print each String emitted by the Observable
.
Now that we've got myObservable
and mySubscriber
we can hook them up to each other using subscribe()
:
myObservable.subscribe(mySubscriber);
// Outputs "Hello, world!"
When the subscription is made, myObservable
calls the subscriber's onNext()
and onComplete()
methods. As a result, mySubscriber
outputs "Hello, world!" then terminates.
Simpler Code
That's a lot of boilerplate code just to say "Hello, world!" That's because I took the verbose route so you could see exactly what's happening. There are lots of shortcuts provided in RxJava to make coding easier.
First, let's simplify our Observable
. RxJava has multiple built-in Observable
creation methods for common tasks. In this case, Observable.just()
emits a single item then completes, just like our code above3:
Observable<String> myObservable =
Observable.just("Hello, world!");
Next, let's handle that unnecessarily verbose Subscriber
. We don't care about onCompleted()
nor onError()
, so instead we can use a simpler class to define what to do during onNext()
:
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
};
Actions can define each part of a Subscriber
. Observable.subscribe()
can handle one, two or three Action
parameters that take the place of onNext()
, onError()
, and onComplete()
. Replicating our Subscriber
from before looks like this:
myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);
However, we only need the first parameter, because we're ignoring onError()
and onComplete()
:
myObservable.subscribe(onNextAction);
// Outputs "Hello, world!"
Now, let's get rid of those variables by just chaining the method calls together:
Observable.just("Hello, world!")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
Finally, let's use Java 8 lambdas to get rid of that ugly Action1
code.
Observable.just("Hello, world!")
.subscribe(s -> System.out.println(s));
If you're on Android (and thus can't use Java 8), I highly recommend using retrolambda; it will cut down on the verbosity of your code immensely.
Transformation
Let's spice things up.
Suppose I want to append my signature to the "Hello, world!" output. One possibility would be to change the Observable
:
Observable.just("Hello, world! -Dan")
.subscribe(s -> System.out.println(s));
This works if you have control over your Observable
, but there's no guarantee that will be the case - what if you're using someone else's library? Another potential problem: what if I use my Observable
in multiple places but only sometimes want to add the signature?
How about we try modifying our Subscriber
instead:
Observable.just("Hello, world!")
.subscribe(s -> System.out.println(s + " -Dan"));
This answer is also unsatisfactory, but for different reasons: I want my Subscribers
to be as lightweight as possible because I might be running them on the main thread. On a more conceptual level, Subscribers
are supposed to be the thing that reacts, not the thing that mutates.
Wouldn't it be cool if I could transform "Hello, world!" with some intermediary step?
Introducing Operators
Here's how we're going to solve the item transformation problems: with operators. Operators can be used in between the source Observable
and the ultimate Subscriber
to manipulate emitted items. RxJava comes with a huge collection of operators, but its best to focus on just a handful at first.
For this situation, the map()
operator can be used to transform one emitted item into another:
Observable.just("Hello, world!")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " -Dan";
}
})
.subscribe(s -> System.out.println(s));
Again, we can simplify this by using lambdas:
Observable.just("Hello, world!")
.map(s -> s + " -Dan")
.subscribe(s -> System.out.println(s));
Pretty cool, eh? Our map()
operator is basically an Observable
that transforms an item. We can chain as many map()
calls as we want together, polishing the data into a perfect, consumable form for our end Subscriber
.
More on map()
Here's an interesting aspect of map()
: it does not have to emit items of the same type as the source Observable
!
Suppose my Subscriber
is not interested in outputting the original text, but instead wants to output the hash of the text:
Observable.just("Hello, world!")
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return s.hashCode();
}
})
.subscribe(i -> System.out.println(Integer.toString(i)));
Interesting - we started with a String but our Subscriber
receives an Integer. Again, we can use lambdas to shorten this code:
Observable.just("Hello, world!")
.map(s -> s.hashCode())
.subscribe(i -> System.out.println(Integer.toString(i)));
Like I said before, we want our Subscriber
to do as little as possible. Let's throw in another map()
to convert our hash back into a String:
Observable.just("Hello, world!")
.map(s -> s.hashCode())
.map(i -> Integer.toString(i))
.subscribe(s -> System.out.println(s));
Would you look at that - our Observable
and Subscriber
are back to their original code! We just added some transformational steps in between. We could even add my signature transformation back in as well:
Observable.just("Hello, world!")
.map(s -> s + " -Dan")
.map(s -> s.hashCode())
.map(i -> Integer.toString(i))
.subscribe(s -> System.out.println(s));
So What?
At this point you might be thinking "that's a lot of fancy footwork for some simple code." True enough; it's a simple example. But there's two ideas you should take away:
Key idea #1: Observable
and Subscriber
can do anything.
Let your imagination run wild! Anything is possible.
Your Observable
could be a database query, the Subscriber
taking the results and displaying them on the screen. Your Observable
could be a click on the screen, the Subscriber
reacting to it. Your Observable
could be a stream of bytes read from the internet, the Subscriber
could write it to the disk.
It's a general framework that can handle just about any problem.
Key idea #2: The Observable
and Subscriber
are independent of the transformational steps in between them.
I can stick as many map()
calls as I want in between the original source Observable
and its ultimate Subscriber
. The system is highly composable: it is easy to manipulate the data. As long as the operators work with the correct input/output data I could make a chain that goes on forever4.
Combine our two key ideas and you can see a system with a lot of potential. At this point, though, we only have a single operator, map()
, which severely limits our capabilities. In part 2 we'll take a dip into the large pool of operators available to you when using RxJava.
1 The smallest building block is actually an Observer
, but in practice you are most often using Subscriber
because that's how you hook up to Observables
.
2 The term used is "hot" vs. "cold" Observables
. A hot Observable
emits items all the time, even when no one is listening. A cold Observable
only emits items when it has a subscriber (and is what I'm using in all my examples). This distinction isn't that important to initially learning RxJava.
3 Strictly speaking, Observable.just()
isn't exactly the same as our initial code, but I won't get to why until part 3.
4 Okay, not really, since I'll hit the bounds of the machine at some point, but you get the idea.