I have a double-take with the observeOn() and subscribeOn() with RxJava. I understand that they do not parallelize emissions on a single stream. In other words, a single stream of emissions will only be put on one thread correct? My test below seemed to indicate this. My understanding also is you have to flatMap() a scheduler, as in .flatMap(v -> Observable.just(v).subscribeOn(Schedulers.computation())), to parallelize emissions on a single stream.
Also if that is the case then could thread starvation happen with the schedulers? If my computation scheduler has 5 threads, but I have more than 5 long-running asynchronous streams being processed is there a possibility starvation could occur? Or is this unlikely just because of RxJava's nature?
public class Test {
public static void main(String[] args) {
Observable<String> airports = Observable.just("ABQ", "HOU",
"PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub1 " + s +
" " + Thread.currentThread().getName()));
airports.subscribeOn(Schedulers.io()).map(Test::stall)
.subscribe(s -> System.out.println("Sub2 " + s +
" " + Thread.currentThread().getName()));
sleep();
}
private static String stall(String str) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return str;
}
private static void sleep() {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
With flatMap, it is possible one async source is overwhelmed by others and it can't make progress on its own source. In practice, however, I haven't seen this happen due to OS and JVM hiccups that give enough breathing room and because of backpressure and arbitration in flatMap itself. If you are worried about this overwhelming, you can use the maxConcurrent parameter with a flatMap overload and limit the number of concurrent subscriptions.
RxJava is mostly written in a non-blocking fashion so when sources need to be merged or combined, they don't really wait for each other.
Computation scheduler is a pool of single-threaded executors and are assigned in round-robin fashion to callers. I don't know about the fairness of the standard executors.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With