Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava and parallel execution of observer code

I am having the following code using RxJava Observable api :

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());     observable       .buffer(10000)       .observeOn(Schedulers.computation())       .subscribe(recordInfo -> {         _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());           for(Info info : recordInfo) {             // some I/O operation logic          }       },        exception -> {       },        () -> {       }); 

My expectation was that the observation code i.e. code inside the subscribe() method will be executed in parallel after I have specified the computation scheduler. Instead the code is still being executed sequentially on single thread. How can make the code run in parallel using RxJava api.

like image 781
Pawan Mishra Avatar asked Feb 16 '16 07:02

Pawan Mishra


People also ask

Is RxJava parallel?

RxJava 2.0. 5 introduced parallel flows and ParallelFlowable, which makes parallel execution simpler and more declarative.

What is Observable and observer in RxJava?

An Observable is like a speaker that emits the value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.

Is RxJava asynchronous?

Usually, asynchronous code is non-blocking: You call a method that returns immediately, allowing your code to continue its execution. Once the result of your call is available, it is returned via a callback. RxJava is asynchronous, too.

How do you make an observer in RxJava?

create(.. target) where you could likely have your listen() implementation to call the target's onnext/onerror/oncomplete. Of course, there is much code to add for when the subscriber unsubscribes (if that) so that the listeners can be removed. But that's a start.


2 Answers

RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.

A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time.

To achieve parallelism you need multiple Observables.

This runs in a single thread:

Observable<Integer> vals = Observable.range(1,10);  vals.subscribeOn(Schedulers.computation())           .map(i -> intenseCalculation(i))           .subscribe(val -> System.out.println("Subscriber received "                   + val + " on "                   + Thread.currentThread().getName())); 

This runs in multiple threads:

Observable<Integer> vals = Observable.range(1,10);  vals.flatMap(val -> Observable.just(val)             .subscribeOn(Schedulers.computation())             .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

Code and text comes from this blog post.

like image 95
LordRaydenMK Avatar answered Oct 19 '22 19:10

LordRaydenMK


RxJava 2.0.5 introduced parallel flows and ParallelFlowable, which makes parallel execution simpler and more declarative.

You no longer have to create Observable/Flowable within flatMap, you can simply call parallel() on Flowable and it returns ParallelFlowable.

It's not as feature rich as a regular Flowable, because concurrency raises many issues with Rx contracts, but you have basic map(), filter() and many more, which should be enough in most cases.

So instead of this flow from @LordRaydenMK answer:

Observable<Integer> vals = Observable.range(1,10);  vals.flatMap(val -> Observable.just(val)         .subscribeOn(Schedulers.computation())         .map(i -> intenseCalculation(i))     ).subscribe(val -> System.out.println(val)); 

Now you can do:

Flowable<Integer> vals = Flowable.range(1, 10);  vals.parallel()         .runOn(Schedulers.computation())         .map(i -> intenseCalculation(i))         .sequential()         .subscribe(val -> System.out.println(val)); 
like image 35
michalbrz Avatar answered Oct 19 '22 18:10

michalbrz