TLDR : I have background processing going on in RxJava Observables, I am in integration tests, I would like to be able to independently wait for that processing to finish to make sure that background processing started from one test does not interfere with another test.
Simplified, I hava a @RequestMapping
method that does the following :
HttpStatus.NO_CONTENT
)This asynchronous processing was previously done with a ThreadPoolTaskExecutor
. We're going to transition to RxJava and would like to remove this ThreadPoolTaskExecutor and do the background processing with RxJava.
So quite naively for the moment I tried to do that instead:
Observable
.defer(() -> Observable.just(call to long blocking method)
.subscribeOn(Schedulers.io())
.subscribe();
The end goal is of course to, one step at a time, go down into "call to long blocking method" and use Observable all the way.
Now before that I would like to make my integration tests work first. I am testing this by doing a RestTemplate call to the mapping. As most of the work is asynchronous my call returns really fast. Now I would like to find a way to wait for the asynchronous processing to finish (to make sure it does not conflict with another test).
Before RxJava I would just count the tasks in the ThreadPoolTaskExecutor and wait till it would reach 0.
How can I do that with RxJava ?
What I tried :
I'm probably terribly wrong, or overcomplicating thing, so don't hesitate to correct my reasonning !
Either create an RxJava lift operator that wraps (in the call method) the Subscriber in a parent Subscriber that has a waitForCompletion method. How you do the waiting is up to you (with a CountDownLatch for example).
One or more observers subscribe to it to receive emitted events. Typically, the observer and observables are executed in separate threads in an asynchronous fashion – that makes the code hard to test in a traditional way. Fortunately, RxJava provides a TestSubscriber class which gives us the ability to test asynchronous, event-driven flow.
Let's start with the Timer Operator of RxJava. Timer operator is used when we want to do something after a span of time that we specify. Let's understand Timer operator with an example. Here as we have passed 2 seconds into the Timer operator, it will go into the flatMap operator after 2 seconds.
2. Testing RxJava – the Traditional Way Let's start with an example – we have a sequence of letters that we want to zip with a sequence of integers from 1 inclusive. Our test should assert that a subscriber that listens to events emitted by zipped observable receives letters zipped with integers.
How to you return HttpStatus.NO_CONTENT
?
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe();
return HttpStatus.NO_CONTENT;
}
In this form, you can't know when the longMethod
is finished.
If you wants to know when all async jobs are completed, you can return HttpStatus.NO_CONTENT
when all jobs are completed, using Spring DefferedResult
or using a TestSubscriber
PS: you can use Observable.fromCallable(() -> longMethod());
instead of Observable.defer(() -> Observable.just(longMethod());
if you want
@RequestMapping(value = "/")
public DeferredResult<HttpStatus> index() {
DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
Observable.fromCallable(() -> longMethod())
.subscribeOn(Schedulers.io())
.subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
return deferredResult;
}
Like this, if you call your method, you'll get your result only when your observable complete (so, when the longMethod
is finished)
You'll have to inject a TestSubscriber
and when ask him to wait/check the completion of your Observable :
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe(subscriber); // you'll have to inject this subscriber in your test
return HttpStatus.NO_CONTENT;
}
and in your test :
TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
// ....
controller.home();
subscriber.awaitTerminalEvent();
subscriber.assertCompleted(); // check that no error occurred
You could use a ExecutorServiceAdapter
to bridge from the Spring ThreadPoolTaskExecutor
to the ExecutorService
in RX, and then do the same trick as before.
A few month later in the game : my advice is simply "don't do that". RxJava
is not really suited to this kind of job. Without going too much in detail having lots of "loose" Observable running in the background is not appropriate : depending on the volume of your requests you can easily fall into queue and memory issues, and more importantly what happens with all the scheduled and running tasks if the webserver crashes ? How do you restart that ?
Spring offers other better alternatives imho : Spring Batch, Spring Cloud Task, messaging with Spring Cloud Stream, so don't do as I did and just use the right tool for the right job.
Now If you really want to go the bad route :
lift
operator that wraps (in the call
method) the Subscriber in a parent Subscriber that has a waitForCompletion
method. How you do the waiting is up to you (with a CountDownLatch
for example). That subscriber would be added to a synchronized list (and removed from it once completed), and in your tests you could just iterate over the list and call waitForCompletion
on each item of the list. It's not that complicated and I got it to work, but please, dont do that !If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With