Loading data from multiple sources with RxJava

Suppose I have some Data that I query from the network. I could simply hit the network each time I need the data, but caching the data on disk and in memory would be much more efficient.

More specifically, I want a setup that:

  1. Occasionally performs queries from the network for fresh data.
  2. Retrieves data as quickly as possible otherwise (by caching network results).

I'd like to present an implementation of this setup using RxJava.

Basic Pattern

Given an Observable<Data> for each source (network, disk and memory), we can construct a simple solution using two operators, concat() and first().

concat() takes multiple Observables and concatenates their sequences. first() emits only the first item from a sequence. Therefore, if you use concat().first(), it retrieves the first item emitted by multiple sources.

Let's see it in action:

// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;  
Observable<Data> disk = ...;  
Observable<Data> network = ...;

// Retrieve the first source with data
Observable<Data> source = Observable  
  .concat(memory, disk, network)

The key to this pattern is that concat() only subscribes to each child Observable when it needs to. There's no unnecessary querying of slower sources if data is cached, since first() will stop the sequence early. In other words, if memory returns a result, then we won't bother going to disk or network. Conversely, if neither memory nor disk have data, it'll make a new network request.

Note that the order of the source Observables in concat() matters, since it's checking them one-by-one.

Saving Data

The obvious next step is to save sources as they come in. If you don't save the results of the network request to disk, or cache disk requests in memory, you'll never see any savings! All the above code would do is constantly make network requests.

My solution is to have each source save/cache data as it emits it:

Observable<Data> networkWithSave = network.doOnNext(data -> {  

Observable<Data> diskWithCache = disk.doOnNext(data -> {  

Now, if you use networkWithSave and diskWithCache, the data will automatically be stored as you load it.

(Another advantage of this tactic is that networkWithSave/diskWithCache can be used anywhere, not just in our multiple sources pattern.)

Stale Data

Unfortunately, now our data-saving code is working a little too well! It's always returning the same data, no matter how out-of-date it is. Remember, we'd like to go back to the server occasionally for fresh data.

The solution is in first(), which can also perform filtering. Just set it up to reject data that isn't worthy:

Observable<Data> source = Observable  
  .concat(memory, diskWithCache, networkWithSave)
  .first(data -> data.isUpToDate());

Now we'll only emit the first item that qualifies as up-to-date. Thus, if one of our sources has stale Data, we'll continue on to the next one until we find fresh Data.

first() vs. takeFirst()

As an alternative to using first() for this pattern, you could also use takeFirst().

The difference between the two calls is that first() will throw a NoSuchElementException if none of the sources emits valid data, whereas takeFirst() will simply complete without exception.

Which you use depends on whether you need to explicitly handle a lack of data or not.

Code Samples

Here's an implementation of the above code which you can check out here: https://github.com/dlew/rxjava-multiple-sources-sample

If you'd like a real-world example, check out the Gfycat app, which uses this pattern when retrieving Gfycat metadata. The code doesn't use all the capabilities shown above (since it doesn't need it), but it demonstrates the basic concat().first() setup.

comments powered by Disqus