Loading data from multiple sources with RxJava
Suppose I have some Data
that I query from the network. I could simply hit the network each time I need the data, but caching the data on disk and in memory would be much more efficient.
More specifically, I want a setup that:
- Occasionally performs queries from the network for fresh data.
- Retrieves data as quickly as possible otherwise (by caching network results).
I'd like to present an implementation of this setup using RxJava.
Basic Pattern
Given an Observable<Data>
for each source (network, disk and memory), we can construct a simple solution using two operators, concat()
and first()
.
concat()
takes multiple Observables
and concatenates their sequences. first()
emits only the first item from a sequence. Therefore, if you use concat().first()
, it retrieves the first item emitted by multiple sources.
Let's see it in action:
// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;
// Retrieve the first source with data
Observable<Data> source = Observable
.concat(memory, disk, network)
.first();
The key to this pattern is that concat()
only subscribes to each child Observable
when it needs to. There's no unnecessary querying of slower sources if data is cached, since first()
will stop the sequence early. In other words, if memory
returns a result, then we won't bother going to disk
or network
. Conversely, if neither memory
nor disk
have data, it'll make a new network request.
Note that the order of the source Observables
in concat()
matters, since it's checking them one-by-one.
Saving Data
The obvious next step is to save sources as they come in. If you don't save the results of the network request to disk, or cache disk requests in memory, you'll never see any savings! All the above code would do is constantly make network requests.
My solution is to have each source save/cache data as it emits it:
Observable<Data> networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});
Observable<Data> diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});
Now, if you use networkWithSave
and diskWithCache
, the data will automatically be stored as you load it.
(Another advantage of this tactic is that networkWithSave
/diskWithCache
can be used anywhere, not just in our multiple sources pattern.)
Stale Data
Unfortunately, now our data-saving code is working a little too well! It's always returning the same data, no matter how out-of-date it is. Remember, we'd like to go back to the server occasionally for fresh data.
The solution is in first()
, which can also perform filtering. Just set it up to reject data that isn't worthy:
Observable<Data> source = Observable
.concat(memory, diskWithCache, networkWithSave)
.first(data -> data.isUpToDate());
Now we'll only emit the first item that qualifies as up-to-date. Thus, if one of our sources has stale Data
, we'll continue on to the next one until we find fresh Data
.
first() vs. takeFirst()
As an alternative to using first()
for this pattern, you could also use takeFirst()
.
The difference between the two calls is that first()
will throw a NoSuchElementException
if none of the sources emits valid data, whereas takeFirst()
will simply complete without exception.
Which you use depends on whether you need to explicitly handle a lack of data or not.
Code Samples
Here's an implementation of the above code which you can check out here: https://github.com/dlew/rxjava-multiple-sources-sample
If you'd like a real-world example, check out the Gfycat app, which uses this pattern when retrieving Gfycat metadata. The code doesn't use all the capabilities shown above (since it doesn't need it), but it demonstrates the basic concat().first()
setup.