Testing Never-Ending Flows

Suppose we’ve got an RxJava test to verify that a stream never emits anything*:

@Test
fun verifyNeverEmits() {
  Observable.never<String>().subscribe { fail() }
}

How would the equivalent one look like for Flow? Suppose I’m testing a theoretical neverFlow():

@Test
fun verifyNeverCollects() = runBlocking {
  neverFlow().collect { fail() }
}

There’s a problem here, though: the Flow version of this test hangs and never finishes. Huh?

The reason for this is structured concurrency. runBlocking() doesn’t finish until everything inside of it finishes, and by definition, neverFlow() doesn’t finish.

The RxJava version of this test works because RxJava does not use structured concurrency. Indeed, after the test ends, there’s a dangling Disposable floating in the ether (hopefully to be garbage collected).


This problem will come up for any never-ending stream, but there are a couple ways to handle it.

If we're just expecting a few emissions, we can terminate the stream using something like take(n):

@Test
fun verifySomething() = runBlocking {
  someInfiniteFlow().take(1).collect { assertEquals(1, it) }
}

Alternatively, we can set ourselves up for canceling the work at the end of the test:

@Test
fun verifyNeverCollects() = runBlocking {
  val job = launch {
    neverFlow().collect { fail() }
  }
  job.cancel()
}

Don't want to do that yourself? Turbine is small testing library for Flow, and part of its utility is in automatically canceling tests. Here's how the above looks like in Turbine:

@Test
fun verifyNeverCollects() = runBlocking {
  neverFlow().test { expectNoEvents() }
}

While this seems like extra work (compared to RxJava), ultimately the Flow tests are going to be more correct due to adhering to structured concurrency.


* Nevermind that we're going to run afoul of the Halting Problem here!