Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava testing : how to wait for all background tasks to complete

TLDR : I have background processing going on in RxJava Observables, I am in integration tests, I would like to be able to independently wait for that processing to finish to make sure that background processing started from one test does not interfere with another test.


Simplified, I hava a @RequestMapping method that does the following :

  • insert data in database
  • launch an asynchronous processing of that data (http calls via Feign, db updates)
  • returns nothing (HttpStatus.NO_CONTENT)

This asynchronous processing was previously done with a ThreadPoolTaskExecutor. We're going to transition to RxJava and would like to remove this ThreadPoolTaskExecutor and do the background processing with RxJava.

So quite naively for the moment I tried to do that instead:

Observable
    .defer(() -> Observable.just(call to long blocking method) 
    .subscribeOn(Schedulers.io())
    .subscribe();

The end goal is of course to, one step at a time, go down into "call to long blocking method" and use Observable all the way.

Now before that I would like to make my integration tests work first. I am testing this by doing a RestTemplate call to the mapping. As most of the work is asynchronous my call returns really fast. Now I would like to find a way to wait for the asynchronous processing to finish (to make sure it does not conflict with another test).

Before RxJava I would just count the tasks in the ThreadPoolTaskExecutor and wait till it would reach 0.

How can I do that with RxJava ?


What I tried :

  • I tried to make all my Schedulers immediate with an RxJavaSchedulersHook : this cause some sort of blocking somewhere, code execution stops just before my Feign calls (Feign uses RxJava under the hood)
  • I tried to count the tasks with an Rx RxJavaObservableExecutionHook : I tried retaining the subscriptions, and removing them when isSubcribed = false, but this didn't work at all (lots of subscribers, the count never goes down)
  • I tried to put an observeOn(immediate()) in the real production code. This seems to work, and I could inject the right scheduler for runtime/test phases, but I am not really keen on putting code just for testing purposes in my real production code.

I'm probably terribly wrong, or overcomplicating thing, so don't hesitate to correct my reasonning !

like image 846
Sébastien Nussbaumer Avatar asked Apr 29 '16 10:04

Sébastien Nussbaumer


People also ask

How to wait for subscription completion in RxJava?

Either create an RxJava lift operator that wraps (in the call method) the Subscriber in a parent Subscriber that has a waitForCompletion method. How you do the waiting is up to you (with a CountDownLatch for example).

How do you test asynchronous flow in RxJava?

One or more observers subscribe to it to receive emitted events. Typically, the observer and observables are executed in separate threads in an asynchronous fashion – that makes the code hard to test in a traditional way. Fortunately, RxJava provides a TestSubscriber class which gives us the ability to test asynchronous, event-driven flow.

How to use timer operator in RxJava?

Let's start with the Timer Operator of RxJava. Timer operator is used when we want to do something after a span of time that we specify. Let's understand Timer operator with an example. Here as we have passed 2 seconds into the Timer operator, it will go into the flatMap operator after 2 seconds.

How do you test ZIP in RxJava?

2. Testing RxJava – the Traditional Way Let's start with an example – we have a sequence of letters that we want to zip with a sequence of integers from 1 inclusive. Our test should assert that a subscriber that listens to events emitted by zipped observable receives letters zipped with integers.


3 Answers

How to you return HttpStatus.NO_CONTENT ?

@RequestMapping(value = "/")
public HttpStatus home() {
    Observable.defer(() -> Observable.just(longMethod())
              .subscribeOn(Schedulers.io())
              .subscribe();
    return HttpStatus.NO_CONTENT;
}

In this form, you can't know when the longMethod is finished.

If you wants to know when all async jobs are completed, you can return HttpStatus.NO_CONTENT when all jobs are completed, using Spring DefferedResult or using a TestSubscriber

PS: you can use Observable.fromCallable(() -> longMethod()); instead of Observable.defer(() -> Observable.just(longMethod()); if you want

Using DefferedResult

@RequestMapping(value = "/")
public DeferredResult<HttpStatus> index() {
    DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
    Observable.fromCallable(() -> longMethod())
            .subscribeOn(Schedulers.io())
            .subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
    return deferredResult;
}

Like this, if you call your method, you'll get your result only when your observable complete (so, when the longMethod is finished)

Using TestSubscriber

You'll have to inject a TestSubscriber and when ask him to wait/check the completion of your Observable :

@RequestMapping(value = "/")
public HttpStatus home() {
    Observable.defer(() -> Observable.just(longMethod())
              .subscribeOn(Schedulers.io())
              .subscribe(subscriber); // you'll have to inject this subscriber in your test
    return HttpStatus.NO_CONTENT;
}

and in your test :

 TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
 // ....
 controller.home();
 subscriber.awaitTerminalEvent();
 subscriber.assertCompleted(); // check that no error occurred
like image 76
dwursteisen Avatar answered Oct 16 '22 06:10

dwursteisen


You could use a ExecutorServiceAdapter to bridge from the Spring ThreadPoolTaskExecutor to the ExecutorService in RX, and then do the same trick as before.

like image 2
Dave Syer Avatar answered Oct 16 '22 08:10

Dave Syer


A few month later in the game : my advice is simply "don't do that". RxJava is not really suited to this kind of job. Without going too much in detail having lots of "loose" Observable running in the background is not appropriate : depending on the volume of your requests you can easily fall into queue and memory issues, and more importantly what happens with all the scheduled and running tasks if the webserver crashes ? How do you restart that ?

Spring offers other better alternatives imho : Spring Batch, Spring Cloud Task, messaging with Spring Cloud Stream, so don't do as I did and just use the right tool for the right job.

Now If you really want to go the bad route :

  • Either return an SseEmmitter and consume only the first event from the SSE in the consumer service, and consume all events in your tests
  • Either create an RxJava lift operator that wraps (in the call method) the Subscriber in a parent Subscriber that has a waitForCompletion method. How you do the waiting is up to you (with a CountDownLatch for example). That subscriber would be added to a synchronized list (and removed from it once completed), and in your tests you could just iterate over the list and call waitForCompletion on each item of the list. It's not that complicated and I got it to work, but please, dont do that !
like image 1
Sébastien Nussbaumer Avatar answered Oct 16 '22 06:10

Sébastien Nussbaumer