Multicasting in RxJava

Multicasting is a key method for reducing duplicated work in RxJava.

When you multicast an event, you send the same event to all downstream operators/subscribers. This is useful when you're doing an expensive operation like a network request. You don't want to repeatedly execute identical network requests for each subscriber - you just want to execute one then multicast the results.

There are two ways to multicast:

  1. Use a ConnectableObservable (via publish() or replay()1)
  2. Use a Subject

Any work done before the ConnectableObservable or Subject will only happen once, then that work will be multicast to all downstream Subscribers.

There is a subtle point here you must recognize: Streams only multicast at the point of the ConnectableObservable or Subject. As a result, any work done after the multicast is duplicated per Subscriber.

Let's look at an example of how this can come into play:

Observable<String> observable = Observable.just("Event")  
    .publish()
    .autoConnect(2)
    .map(s -> {
      System.out.println("Expensive operation for " + s);
      return s;
    });

observable.subscribe(s -> System.out.println("Sub1 got: " + s));  
observable.subscribe(s -> System.out.println("Sub2 got: " + s));

// Output:
// Expensive operation for Event
// Sub1 got: Event
// Expensive operation for Event
// Sub2 got: Event

Here we have a ConnectableObservable, an expensive map() operation and two Subscribers. The surprising result is that the expensive map() operation is executed twice, even though we tried to prevent that with publish()!

This chart makes the situation clearer:

Chart showing why publish didn't work

If you actually wanted the map() to happen once, you would need to put it before the publish() call:

Observable<String> observable = Observable.just("Event")  
    .map(s -> {
      System.out.println("Expensive operation for " + s);
      return s;
    })
    .publish()
    .autoConnect(2);

observable.subscribe(s -> System.out.println("Sub1 got: " + s));  
observable.subscribe(s -> System.out.println("Sub2 got: " + s));

// Output:
// Expensive operation for Event
// Sub1 received: Event
// Sub2 received: Event

Here's an updated chart:

Chart with correct publish usage

What's the lesson here? If you're relying on multicasting to reduce work, make sure you multicast at the right point.

For better or worse, a lot of people are using Subjects. One advantage is that they are multicast but you have to remember that they are only multicast at the point they emit. If you have a bunch of expensive operators applied downstream of the Subject then you should consider adding another publish() somewhere downstream.


1 share() and cache() are also options, but they are basically just shortcuts around ConnectableObservable. share() is just publish().refCount() and cache() can be recreated by using replay().autoConnect().

comments powered by Disqus