Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding a Pool of Threads in a RxJava Flow

I was wondering if I can use the RxJava library in order to add some concurrency in the following use-case:

  • Fetch sequentially a String column from an existing ResultSet with a custom Observable (something like ResultSetObservable.create(resultSet))
  • Invoking a Web Service for each of these value (with a InvokeWebServiceFunc1<String, Pair<String, Integer>>() instance, for example) in order to retrieve some statistiques related to the String (note that the String in the Pair is the same as the one passed in input)
  • Print everything in a CSV format (with a ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream)).

So here is what I have:

ResultSetObservable.create(resultSet)
    .map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
    .subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));

It works well but as the Web Service may take some time for some of the String input, I want to add some concurrency by having a thread pool like behavior for the mapping (of 10 threads for example) but I need the ExportAsCSVAction0 to be called in the same thread (and actually the current thread would be perfect).

Can you please help me? I can't figure if using the toBlocking().forEach() pattern is the right solution here and I do not understand where to use the Schedulers.from(fixedThreadPool) (in the observeOn() or in the subscribeOn()).

Thank you for any help!

like image 321
radium226 Avatar asked Dec 09 '14 10:12

radium226


People also ask

How do you implement a thread pool?

To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size. The runnables that are run by a particular thread are executed sequentially.

How many ways we can create thread pool?

We can create the following 5 types of thread pool executors with pre-built methods in java. util. concurrent. Executors interface.

Is RxJava multithreaded?

RxJava is NOT Multi-Threaded by Default RxJava, by default, is not multi-threaded in any way. The definition given for RxJava on their official website is as follows: A library for composing asynchronous and event-based programs using observable sequences for the Java VM.

What is a pool in multithreading?

A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.


1 Answers

I found it by myself!

package radium.rx;

import java.util.List;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.schedulers.Schedulers;

public class TryRx {

    public static Random RANDOM = new Random();

    public static void main(String[] arguments) throws Throwable {
        final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
        final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);

        Iterable<Integer> outputs = Observable.<Integer>from(inputs)
                .flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor)))
                .toBlocking()
            .toIterable();

        for (Integer output : outputs) {
            System.out.println(output);
        }

        threadPoolExecutor.shutdown();
    }

    public static void sleepQuietly(int duration, TimeUnit unit) {
        try {
            Thread.sleep(unit.toMillis(duration));
        } catch (InterruptedException e) {

        }
    }

    public static Observable<Integer> deferHeavyWeightStuff(final int input) {
        return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input)));
    }

    public static int randomInt(int min, int max) {
        return RANDOM.nextInt((max - min) + 1) + min;
    }

    public static int doHeavyWeightStuff(int input) {
        sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS);
        int output = (int) Math.pow(input, 2);
        return output;
    }

}

Cheers!

like image 186
radium226 Avatar answered Sep 21 '22 13:09

radium226