Do or do not; there is no tryEmit()

In RxJava, PublishSubject (or PublishRelay if you’re a cool kid) is a gizmo for manually pumping data into streams like in this contrived example:

private val socketSubject = PublishSubject.create<String>()

fun setupStream() = socketSubject.subscribe(System.out::println)

fun onSocketUpdate(data: String) {
  socketSubject.onNext(data)
}

(Yes, I know we should try to avoid subjects if possible but sometimes you can't avoid them and other times you’re just feeling naughty.)

This pattern served us well for years but now it’s 2021 and we’ve decided that Flow is hot whereas RxJava is not.

Off we go to Google Duck Duck Go Ecosia to search for the “equivalent of PublishSubject for coroutines.” This search, unfortunately, leads to a bunch of outdated garbage right now. You will find plenty of posts about ConflatedBroadcastChannel but it “turned out to be a design dead-end” and “will be deprecated and removed in the future.”

To save you another five minutes of Ecosia… uh… Ecosing?... the equivalent of PublishSubject is MutableSharedFlow now.

MutableSharedFlow doesn’t look hard to use; once you figure out how CoroutineScopes work, it's practically a drop-in replacement:

private val socketFlow = MutableSharedFlow<String>()

fun setupStream() = socketFlow.onEach(System.out::println)
    .launchIn(GlobalScope)

fun onSocketUpdate(data: String) {
  socketFlow.tryEmit(data)
}

However, nothing prints when run this code. On top of that, tryEmit() returns false, indicating that it knows it didn't actually emit. Why not?

The answer, it turns out, is in the SharedFlow documentation in a section about unbuffered shared flows:

A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers. Thus, tryEmit call succeeds and returns true only if there are no subscribers (in which case the emitted value is immediately lost).

Roman Elizarov puts it another way on this issue:

tryEmit (unlike emit) is not a suspending function, so it clearly cannot operate without a buffer where it can store emitted value for all the suspending subscribers to process. On the other hand, emit is suspending, so it does not need buffer space, as it can always suspend in case any of the subscribers are not ready yet.

In short, you need a buffer to temporarily store values if you’re using tryEmit(). When you add a buffer to socketFlow, it suddenly starts working:

private val socketFlow = MutableSharedFlow<String>(
    extraBufferCapacity = 1
)

fun setupStream() = socketFlow.onEach(System.out::println)
    .launchIn(GlobalScope)

fun onSocketUpdate(data: String) {
  socketFlow.tryEmit(data)
}

It’s important to note that a buffer is not a replay cache and using it will not result in replay-like behavior. You get that behavior solely from the replay parameter. (Also, using replay would also have fixed the above, but then you'd be introducing possibly undesirable behavior.)

You could even add this extension function that blows up if you always want a certain call to tryEmit() to work:

fun <T> MutableSharedFlow<T>.doEmit(value: T) {
  check(tryEmit(value))
}

Although I have a working solution now, I am still left with two questions:

  • How the heck does tryEmit() sneak values into flows when it isn't a suspending function?
  • Why exactly was this buffer necessary, anyways?

I dug into the internals of MutableSharedFlow and I think I figured out the answers...

Under the hood, coroutines are implemented as a bunch of Continuations, which are just asynchronous functions with callbacks. Continuations can be used anywhere, so that's how MutableSharedFlow hooks into coroutines.

But... what if you try to emit to a Continuation that is already in use? Coroutines are concurrent, NOT parallel; you could break things trying to run the same Continuation simultaneously in two places at once.

When you call tryEmit(), there's a lot of complicated book-keeping going on to see if there is an available Continuation that can execute the emission. Finding the Continuation is blocking, but executing it is non-blocking. That's how tryEmit() can magically exist; it schedules work in a blocking manner, but that work then executes in a non-blocking way.

Now, another question: If the Continuation is asynchronous, where does it get the emission value from? There's got to be a place to store that value temporarily before the Continuation starts, while we're still setting everything up. Thus: the buffer.

For the single value/single collector case, the buffer feels unnecessary - all you're doing is immediately passing one value onto a collector when it's ready. But what if you've got tons of emissions going to multiple collectors? Now you've got a real headache managing those values; and since you're non-suspending, you actually have to think about backpressure (which is why onBufferOverflow is the third parameter of MutableSharedFlow). That's why the buffer (managed in a blocking manner, before the collectors spin up) is required.

In short: tryEmit() is managing a blocking (non-suspending) buffer, then handing them off to non-blocking (suspending) coroutines.