I'm learning reactive programming with RxJava, and want to consume emmited values concurrently withouth blocking in a single execution thread.
Observable
.interval(50, TimeUnit.MILLISECONDS)
.take(5)
.subscribe(new Action1<Long>() {
@Override
public void call(Long counter) {
sleep(1000);
System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
}
});
sleep(10000);
I'll get this output
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1
how do i handle each event in async? like this
Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5
In Rx, an observable represents concurrency1, so to process notifications concurrently with respect to each other, you must project each notification into an observable.
flatMap
is the asynchronous sequential composition operator. It projects each notification from a source observable into an observable, thus allowing you to process each input value concurrently. It then merges the results of each computation into a flattened observable sequence with non-overlapping notifications.
Addendum:
In the selector
for flatMap
, there are often multiple ways to create a concurrent observable depending upon the target platform. I don't know Java, but in .NET you would typically either use Observable.Start
to introduce concurrency or an asynchronous method (async/await
) to take advantage of native asynchrony, which is often preferable.
1 Technically, an individual subscription (observer) for a cold observable enables concurrency in Rx, though it's often convenient to think in terms of observables instead. See this answer for more info.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With