Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consuming values concurrently emmited by an Observer

I'm learning reactive programming with RxJava, and want to consume emmited values concurrently withouth blocking in a single execution thread.

        Observable
            .interval(50, TimeUnit.MILLISECONDS)
            .take(5)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long counter) {
                    sleep(1000);
                    System.out.println("Got: " + counter + " thread : "+ Thread.currentThread().getName());
                }
            });

    sleep(10000);

I'll get this output

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-1
Got: 2 thread : RxComputationThreadPool-1
Got: 3 thread : RxComputationThreadPool-1
Got: 4 thread : RxComputationThreadPool-1

how do i handle each event in async? like this

Got: 0 thread : RxComputationThreadPool-1
Got: 1 thread : RxComputationThreadPool-2
Got: 2 thread : RxComputationThreadPool-3
Got: 3 thread : RxComputationThreadPool-4
Got: 4 thread : RxComputationThreadPool-5
like image 491
vach Avatar asked Sep 29 '22 22:09

vach


1 Answers

In Rx, an observable represents concurrency1, so to process notifications concurrently with respect to each other, you must project each notification into an observable.

flatMap is the asynchronous sequential composition operator. It projects each notification from a source observable into an observable, thus allowing you to process each input value concurrently. It then merges the results of each computation into a flattened observable sequence with non-overlapping notifications.

Addendum:

In the selector for flatMap, there are often multiple ways to create a concurrent observable depending upon the target platform. I don't know Java, but in .NET you would typically either use Observable.Start to introduce concurrency or an asynchronous method (async/await) to take advantage of native asynchrony, which is often preferable.

1 Technically, an individual subscription (observer) for a cold observable enables concurrency in Rx, though it's often convenient to think in terms of observables instead. See this answer for more info.

like image 127
Dave Sexton Avatar answered Oct 06 '22 18:10

Dave Sexton