I was wondering if I can use the RxJava library in order to add some concurrency in the following use-case:
String
column from an existing ResultSet
with a custom Observable
(something like ResultSetObservable.create(resultSet)
)InvokeWebServiceFunc1<String, Pair<String, Integer>>()
instance, for example) in order to retrieve some statistiques related to the String
(note that the String
in the Pair
is the same as the one passed in input)ExportAsCSVAction1<Pair<String, Integer>>(PrintStream printStream)
). So here is what I have:
ResultSetObservable.create(resultSet)
.map(new InvokeWebServiceFunc1<String, Pair<String, Integer>>())
.subscribe(new ExportAsCSVAction1<Pair<String, Integer>>(System.out));
It works well but as the Web Service may take some time for some of the String
input, I want to add some concurrency by having a thread pool like behavior for the mapping (of 10 threads for example) but I need the ExportAsCSVAction0
to be called in the same thread (and actually the current thread would be perfect).
Can you please help me? I can't figure if using the toBlocking().forEach()
pattern is the right solution here and I do not understand where to use the Schedulers.from(fixedThreadPool)
(in the observeOn()
or in the subscribeOn()
).
Thank you for any help!
To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size. The runnables that are run by a particular thread are executed sequentially.
We can create the following 5 types of thread pool executors with pre-built methods in java. util. concurrent. Executors interface.
RxJava is NOT Multi-Threaded by Default RxJava, by default, is not multi-threaded in any way. The definition given for RxJava on their official website is as follows: A library for composing asynchronous and event-based programs using observable sequences for the Java VM.
A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.
I found it by myself!
package radium.rx;
import java.util.List;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.schedulers.Schedulers;
public class TryRx {
public static Random RANDOM = new Random();
public static void main(String[] arguments) throws Throwable {
final List<Integer> inputs = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2);
Iterable<Integer> outputs = Observable.<Integer>from(inputs)
.flatMap((Integer input) -> deferHeavyWeightStuff(input).subscribeOn(Schedulers.from(threadPoolExecutor)))
.toBlocking()
.toIterable();
for (Integer output : outputs) {
System.out.println(output);
}
threadPoolExecutor.shutdown();
}
public static void sleepQuietly(int duration, TimeUnit unit) {
try {
Thread.sleep(unit.toMillis(duration));
} catch (InterruptedException e) {
}
}
public static Observable<Integer> deferHeavyWeightStuff(final int input) {
return Observable.defer(() -> Observable.just(doHeavyWeightStuff(input)));
}
public static int randomInt(int min, int max) {
return RANDOM.nextInt((max - min) + 1) + min;
}
public static int doHeavyWeightStuff(int input) {
sleepQuietly(randomInt(1, 5), TimeUnit.SECONDS);
int output = (int) Math.pow(input, 2);
return output;
}
}
Cheers!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With