ConnectableObservable: So Hot Right Now

ConnectableObservable does not care when you subscribe to it; instead, it only begins its work when connect() is called. With that in mind, what is the output of the following?

val connectableObservable = Observable.just("Hello!").publish()

connectableObservable
  .subscribeOn(Schedulers.io())
  .subscribe { println(it) }

connectableObservable.connect()

If you answered "Hello!" then you'd be right... some of the time. Sometimes, it simply prints nothing.

How can that be? Well, by using subscribeOn(), we are switching to another thread to initiate the subscription. Now we've introduced concurrency, thus cracking open the door for race conditions.

Here's the order of events as we expect them to happen:

  1. The subscription is setup and ready to receive data.
  2. The observable connects, subscribes to the underlying stream, and starts emitting data (which our subscription happily receives).

However, if the subscription thread runs slower than the main thread, the opposite can occur:

  1. The observable connects, subscribes to the underlying stream, then emits "Hello!" to no one.
  2. The subscription is setup and ready to receive data, but by now it has missed the boat on the emission!

There are a few ways around this race condition:

These alternatives should suit your needs most of the time. As such, I would consider your situation carefully before using connect(). It can lead to subtle timing bugs, so use it only if you really don't care if your subscriptions miss events.