I am having the following code using RxJava Observable api :
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable .buffer(10000) .observeOn(Schedulers.computation()) .subscribe(recordInfo -> { _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); for(Info info : recordInfo) { // some I/O operation logic } }, exception -> { }, () -> { });
My expectation was that the observation code i.e. code inside the subscribe() method will be executed in parallel after I have specified the computation scheduler. Instead the code is still being executed sequentially on single thread. How can make the code run in parallel using RxJava api.
RxJava 2.0. 5 introduced parallel flows and ParallelFlowable, which makes parallel execution simpler and more declarative.
An Observable is like a speaker that emits the value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.
Usually, asynchronous code is non-blocking: You call a method that returns immediately, allowing your code to continue its execution. Once the result of your call is available, it is returned via a callback. RxJava is asynchronous, too.
create(.. target) where you could likely have your listen() implementation to call the target's onnext/onerror/oncomplete. Of course, there is much code to add for when the subscriber unsubscribes (if that) so that the listeners can be removed. But that's a start.
RxJava is often misunderstood when it comes to the asynchronous/multithreaded aspects of it. The coding of multithreaded operations is simple, but understanding the abstraction is another thing.
A common question about RxJava is how to achieve parallelization, or emitting multiple items concurrently from an Observable. Of course, this definition breaks the Observable Contract which states that onNext() must be called sequentially and never concurrently by more than one thread at a time.
To achieve parallelism you need multiple Observables.
This runs in a single thread:
Observable<Integer> vals = Observable.range(1,10); vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName()));
This runs in multiple threads:
Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));
Code and text comes from this blog post.
RxJava 2.0.5 introduced parallel flows and ParallelFlowable, which makes parallel execution simpler and more declarative.
You no longer have to create Observable
/Flowable
within flatMap
, you can simply call parallel()
on Flowable
and it returns ParallelFlowable
.
It's not as feature rich as a regular Flowable
, because concurrency raises many issues with Rx contracts, but you have basic map()
, filter()
and many more, which should be enough in most cases.
So instead of this flow from @LordRaydenMK answer:
Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));
Now you can do:
Flowable<Integer> vals = Flowable.range(1, 10); vals.parallel() .runOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .sequential() .subscribe(val -> System.out.println(val));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With