Deferring Observable code until subscription in RxJava

I've grown fond of RxJava's defer() as a tool for ensuring Observable code runs when subscribed (rather than when created). I've written about defer() before but I'd like to go into more detail here.

Suppose you've got this data class:

public class SomeType {  
  private String value;

  public void setValue(String value) {
    this.value = value;
  }

  public Observable<String> valueObservable() {
    return Observable.just(value);
  }
}

What do you think will be printed when I run this code?

SomeType instance = new SomeType();  
Observable<String> value = instance.valueObservable();  
instance.setValue("Some Value");  
value.subscribe(System.out::println);  

If you guessed "Some Value", you're wrong. It actually prints out "null", since value had yet to be initialized when Observable.just() is called.

just(), from(), and other Observable creation tools store the value of data when created, not when subscribed. In this case that's not the desired behavior - I want valueObservable() to represent the current value whenever requested.

Do It Yourself

One solution is to use Observable.create() since it allows you to control the sequence precisely for each subscriber:

public Observable<String> valueObservable() {  
  return Observable.create(subscriber -> {
    subscriber.onNext(value);
    subscriber.onCompleted();
  });
}

Now valueObservable() will emit the current value when subscribed. It's roughly equivalent to what Observable.just() does, except it retrieves value on subscription (not creation).

The only issue here is that I have been wary of writing custom operators since reading Dávid Karnok's excellent series of articles on operators. My main takeaway from the series is that operators are tricky to write correctly. Just take a look at this post, which shows how Observable.just() will need to change in order to support backpressure and unsubscription.

While the above code works now, how do I know it will always work for future versions of RxJava? And how do I know I've safely covered all my bases, like backpressure and unsubscription? I'm not an expert on operator development. As such, I've tried avoiding custom operators unless necessary.

The Simple Way

Here's an alternative that uses no custom operators:

public Observable<String> valueObservable() {  
  return Observable.defer(() -> Observable.just(value));
}

All I did was wrap the original code with defer(), but now the behavior is what I want. None of the code inside of defer() is executed until subscription. We only call Observable.just() when someone requests the data.

I prefer this solution for a couple reasons:

  1. It's simpler than Observable.create() - no need to call onCompleted().
  2. It uses built-in operators, so they're (probably) implemented correctly.

The only downside to defer() is that it creates a new Observable each time you get a subscriber. create() can use the same function for each subscriber, so it's more efficient. As always, measure performance and optimize if necessary.

Going Deeper

The code above is simple for pedagogical reasons, but realistically we could have replaced all of it with a BehaviorSubject. Let's take a look at something more complex.

Suppose we want a method which writes data to disk, then returns that data. This is one way to do it with defer():

public Observable<SomeType> createSomeType(String value) {  
  return Observable.defer(() -> {
    SomeType someType = new SomeType();
    someType.setValue(value);

    try {
      db.writeToDisk(someType);
    }
    catch (IOException e) {
      return Observable.error(e);
    }

    return Observable.just(someType);
  });
}

This sample is more complex; it writes data to the disk and calls onError if there are issues. The basic idea remains the same, though: we don't want any of the code to execute until subscription.

There are multiple ways of attacking the above problems. defer() is just one of them, but I've found it handy.