Don't break the chain: use RxJava's compose() operator
One nice aspect of RxJava is that you can see how data is transformed through a series of operators:
Observable.from(someSource)
.map(data -> manipulate(data))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> doSomething(data));
What if you have a set of operators that you want to reuse for multiple streams? For example, I frequently use subscribeOn()
and observeOn()
because I want to process data in a worker thread then subscribe to it on the main thread. It'd be great if I could apply this logic to all my streams in a consistent, reusable manner.
The Bad Way
The following is the anti-pattern I used for many months and is bad (and I feel bad).
First, create a method that applies the schedulers:
<T> Observable<T> applySchedulers(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
Then, wrap your Observable
chain:
applySchedulers(
Observable.from(someSource)
.map(data -> manipulate(data))
)
.subscribe(data -> doSomething(data));
The code works but it's ugly and confusing - what does applySchedulers()
actually apply to? It's no longer a series of operators, so it's hard to follow. There's no way to format this code so it isn't awkward.
Now, just imagine how bad this anti-pattern gets when you use it multiple times for a single stream. *shudder*
Introducing Transformers
The wise people behind RxJava realized this would be a problem and provided a solution: Transformer
, which is used with Observable.compose()
.
Transformer
is actually just Func1<Observable<T>, Observable<R>>
. In other words: feed it an Observable
of one type and it'll return an Observable
of another. That's exactly the same as calling a series of operators inline.
Let's write a method that creates a schedulers Transformer
:
<T> Transformer<T, T> applySchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
If we're using lambdas this looks a lot prettier:
<T> Transformer<T, T> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
Regardless, let's see how our code looks now:
Observable.from(someSource)
.map(data -> manipulate(data))
.compose(applySchedulers())
.subscribe(data -> doSomething(data));
That's much better! We've got reusable code and the chain is preserved.
Update 3/11/2015: If you are compiling on JDK 7 or below, you'll have to do a bit of extra work to make compose()
work with generics. In particular, you have to tell the compiler the return type, like this:
Observable.from(someSource)
.map(data -> manipulate(data))
.compose(this.<YourType>applySchedulers())
.subscribe(data -> doSomething(data));
Reusing Transformers
In the previous example I used methods to create a new Transformer
instance on each invocation. You could create an instanced version and save yourself from unnecessary object instantiation instead. Transformers
are about code reuse, after all.
If you're always transforming from one concrete type to another it's fairly simple to create an instance:
Transformer<String, String> myTransformer = new Transformer<String, String>() {
// ...Do your work here...
};
What about with our scheduler Transformer
? It doesn't care about the type at all, but you can't define a generic instance:
// Doesn't compile; where would T come from?
Transformer<T, T> myTransformer;
You could make it of type Transformer<Object, Object>
, but then the resulting Observable
would lose its type information and be less useful.
To solve this problem I took a hint from Collections
, which has a bunch of type-safe, immutable empty collection creation methods (e.g. Collections.emptyList()
). Internally it's using non-generic instances, then wrapping it in a method which adds generics.
Here's how we'd define our schedulers Transformer
instance:
final Transformer schedulersTransformer =
observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
@SuppressWarnings("unchecked")
<T> Transformer<T, T> applySchedulers() {
return (Transformer<T, T>) schedulersTransformer;
}
Now we've only got one instantiation - great!
A warning: Whenever you do unchecked casts you can run into trouble. Make sure that your Transformer
is truly type agnostic. Otherwise there is the potential that you could run into a ClassCastException
at runtime, even though your code compiles. In this case, we know it's safe because schedulers don't even interact with the emitted items.
What About flatMap()?
At this point, you may be wondering what the difference is between using compose()
and flatMap()
. They both emit Observable<R>
, which means both can reuse a series of operators, right?
The difference is that compose()
is a higher level abstraction: it operates on the entire stream, not individually emitted items. In more specific terms:
-
compose()
is the only way to get the originalObservable<T>
from the stream. Therefore, operators that affect the whole stream (likesubscribeOn()
andobserveOn()
) need to usecompose()
.In contrast, if you put
subscribeOn()
/observeOn()
inflatMap()
, it would only affect theObservable
you create inflatMap()
but not the rest of the stream. -
compose()
executes immediately when you create theObservable
stream, as if you had written the operators inline.flatMap()
executes when itsonNext()
is called, each time it is called. In other words,flatMap()
transforms each item, whereascompose()
transforms the whole stream. -
flatMap()
is necessarily less efficient because it has to create a newObservable
every timeonNext()
is called.compose()
operates on the stream as it is.
If you want to replace some operators with reusable code, use compose()
. flatMap()
has many uses but this is not one of them.
Many thanks to David Gross for reviewing this article.
"Chains break by the weakest link" by Hernán Piñera is licensed under CC BY-SA 2.0