Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to parallel execute consumer in RxJava2?

I have a question about the RxJava2. I want to run the consumer in different thread of a fixed thread pool to parallel execute the List result. Here is my code:

    List<String> letters = Lists.newArrayList("a","b","c","d","e","f","g");
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(letters.size());
    Observable.fromIterable(letters).observeOn(Schedulers.from(fixedThreadPool)).forEach(new Consumer<String>() {
        @Override
        public void accept(String data) throws Exception {
            System.out.println(data + " forEach, thread is " + Thread.currentThread().getName());
        }
    });

I got the result is:

a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-1
c forEach, thread is pool-1-thread-1
d forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-1
f forEach, thread is pool-1-thread-1
g forEach, thread is pool-1-thread-1

But actually what I want is this result, each consumor parallel execute in different thread:

a forEach, thread is pool-1-thread-1
b forEach, thread is pool-1-thread-2
c forEach, thread is pool-1-thread-3
d forEach, thread is pool-1-thread-4
e forEach, thread is pool-1-thread-5
f forEach, thread is pool-1-thread-6
g forEach, thread is pool-1-thread-7

Can somebody tell me how to make it happen?

like image 597
Coral Avatar asked Mar 09 '23 08:03

Coral


1 Answers

In order to read the items in paralell threads, use Flowable<> instead of Observable as it provides the parallel operator. For instance:

 Flowable.fromIterable(letters)
         .parallel(letters.size())
         .runOn(Schedulers.from(fixedThreadPool))
         .sequential()
         .forEach(data -> System.out.println(data + " forEach, thread is " + 
                          Thread.currentThread().getName()));

As you can not predict which one of the threads will be used for each invocation, the output may vary. In my testcase I got

c forEach, thread is pool-1-thread-3
g forEach, thread is pool-1-thread-7
a forEach, thread is pool-1-thread-1
e forEach, thread is pool-1-thread-5
d forEach, thread is pool-1-thread-4
b forEach, thread is pool-1-thread-2
f forEach, thread is pool-1-thread-6

For more information, please consult the parallel-flows section of the RxJava Wiki

like image 183
Zapodot Avatar answered Mar 19 '23 01:03

Zapodot