Multicasting in RxJava
Multicasting is a key method for reducing duplicated work in RxJava.
When you multicast an event, you send the same event to all downstream operators/subscribers. This is useful when you're doing an expensive operation like a network request. You don't want to repeatedly execute identical network requests for each subscriber - you just want to execute one then multicast the results.
There are two ways to multicast:
- Use a
ConnectableObservable
(viapublish()
orreplay()
1) - Use a
Subject
Any work done before the ConnectableObservable
or Subject
will only happen once, then that work will be multicast to all downstream Subscribers
.
There is a subtle point here you must recognize: Streams only multicast at the point of the ConnectableObservable
or Subject
. As a result, any work done after the multicast is duplicated per Subscriber
.
Let's look at an example of how this can come into play:
Observable<String> observable = Observable.just("Event")
.publish()
.autoConnect(2)
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
});
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable.subscribe(s -> System.out.println("Sub2 got: " + s));
// Output:
// Expensive operation for Event
// Sub1 got: Event
// Expensive operation for Event
// Sub2 got: Event
Here we have a ConnectableObservable
, an expensive map()
operation and two Subscribers
. The surprising result is that the expensive map()
operation is executed twice, even though we tried to prevent that with publish()
!
This chart makes the situation clearer:
If you actually wanted the map()
to happen once, you would need to put it before the publish()
call:
Observable<String> observable = Observable.just("Event")
.map(s -> {
System.out.println("Expensive operation for " + s);
return s;
})
.publish()
.autoConnect(2);
observable.subscribe(s -> System.out.println("Sub1 got: " + s));
observable.subscribe(s -> System.out.println("Sub2 got: " + s));
// Output:
// Expensive operation for Event
// Sub1 received: Event
// Sub2 received: Event
Here's an updated chart:
What's the lesson here? If you're relying on multicasting to reduce work, make sure you multicast at the right point.
For better or worse, a lot of people are using Subjects
. One advantage is that they are multicast but you have to remember that they are only multicast at the point they emit. If you have a bunch of expensive operators applied downstream of the Subject
then you should consider adding another publish()
somewhere downstream.
1 share()
and cache()
are also options, but they are basically just shortcuts around ConnectableObservable
. share()
is just publish().refCount()
and cache()
can be recreated by using replay().autoConnect()
.