ConnectableObservable: So Hot Right Now
ConnectableObservable
does not care when you subscribe to it; instead, it only begins its work when connect()
is called. With that in mind, what is the output of the following?
val connectableObservable = Observable.just("Hello!").publish()
connectableObservable
.subscribeOn(Schedulers.io())
.subscribe { println(it) }
connectableObservable.connect()
If you answered "Hello!" then you'd be right... some of the time. Sometimes, it simply prints nothing.
How can that be? Well, by using subscribeOn()
, we are switching to another thread to initiate the subscription. Now we've introduced concurrency, thus cracking open the door for race conditions.
Here's the order of events as we expect them to happen:
- The subscription is setup and ready to receive data.
- The observable connects, subscribes to the underlying stream, and starts emitting data (which our subscription happily receives).
However, if the subscription thread runs slower than the main thread, the opposite can occur:
- The observable connects, subscribes to the underlying stream, then emits "Hello!" to no one.
- The subscription is setup and ready to receive data, but by now it has missed the boat on the emission!
There are a few ways around this race condition:
- Use
refCount()
orautoConnect()
instead ofconnect()
, so that the connection won't start until there are subscribers. - Use
replay()
instead ofpublish()
, so that late subscribers still get previously emitted events. - Avoid
subscribeOn()
if you don't need it.
These alternatives should suit your needs most of the time. As such, I would consider your situation carefully before using connect()
. It can lead to subtle timing bugs, so use it only if you really don't care if your subscriptions miss events.