Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does my RxJava Observable not emit or complete unless it's blocking?

Background

I have a number of RxJava Observables (either generated from Jersey clients, or stubs using Observable.just(someObject)). All of them should emit exactly one value. I have a component test that mocks out all the Jersey clients and uses Observable.just(someObject), and I see the same behaviour there as when running the production code.

I have several classes that act upon these observables, perform some calculations (& some side-effects - I might make them direct return values later) and return empty void observables.

At one point, in one such class, I'm trying to zip several of my source observables up and then map them - something like the below:

public Observable<Void> doCalculation() {
    return Observable.zip(
        getObservable1(),
        getObservable2(),
        getObservable3(),
        UnifyingObject::new
    ).concatMap(unifyingObject -> unifyingObject.processToNewObservable())
}

// in Unifying Object
public Observable<Void> processToNewObservable() {
    // ... do some calculation ...
    return Observable.empty();
}

The calculating classes are then all combined and waited on:

// Wait for rule computations to complete
List<Observable<Void>> calculations = ...;
Observable.zip(calculations, results -> results)
        .toBlocking().lastOrDefault(null);

The problem

The trouble is, processToNewObservable() is never being executed. By process of elimination, I can see it's getObservable1() that's the trouble - if I replace it with Observable.just(null), everything's executed as I'd imagine (but with a null value where I want a real one).

To reiterate, getObservable1() returns an Observable from a Jersey client in production code, but that client is a Mockito mock returning Observable.just(someValue) in my test.

Investigation

If I convert getObservable1() to blocking, then wrap the first value in just(), again, everything executes as I'd imagine (but I don't want to introduce the blocking step):

Observable.zip(
    Observable.just(getObservable1().toBlocking().first()),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

My first thought was that perhaps something else was consuming the value emitted from my observable, and zip was seeing that it was already complete, thus determining that the result of zipping them should be an empty observable. I've tried adding .cache() onto every observable source I can think is related, however, but that hasn't altered the behaviour.

I've also tried adding next / error / complete / finally handlers on getObservable1 (without converting it to blocking) just before the zip, but none of them executed either:

getObservable1()
    .doOnNext(...)
    .doOnCompleted(...)
    .doOnError(...)
    .finallyDo(...);

Observable.zip(
    getObservable1(),
    getObservable2(),
    getObservable3(),
    UnifyingObject::new
).concatMap(unifyingObject -> unifyingObject.processToNewObservable())

The question

I'm very new to RxJava, so I'm pretty sure I'm missing something fundamental. The question is: what stupid thing could I be doing? If that's not obvious from what I've said so far, what can I do to help diagnose the issue?

like image 666
Rowan Avatar asked Dec 13 '15 23:12

Rowan


People also ask

Is RxJava blocking?

Usually, asynchronous code is non-blocking: You call a method that returns immediately, allowing your code to continue its execution. Once the result of your call is available, it is returned via a callback. RxJava is asynchronous, too.

What is a blocking Observable?

BlockingObservable is a variety of Observable that provides blocking operators. It can be useful for testing and demo purposes, but is generally inappropriate for production applications (if you think you need to use a BlockingObservable this is usually a sign that you should rethink your design).

What is the difference between single and Observable in RxJava?

Single and Completable are new types introduced exclusively at RxJava that represent reduced types of Observable , that have more concise API. Single represent Observable that emit single value or error. You can think of the differences like the differences of a method that returns: Collection of Objects - Observable.


1 Answers

The Observable must emit to start the chain. You have to think of your pipeline as a declaration of what will happen when the Observable emits.

You didn't share what was actually being observed, but Observable.just() causes the Observable to emit the wrapped object immediately.

like image 145
John Scattergood Avatar answered Sep 21 '22 13:09

John Scattergood