I have an slow, CPU intensive operation: doWork(int x)
, that is called with a single integer parameter with different values, as follows:
static String doWork(int x) {
// do work that depends on i, takes ~60 seconds
...
}
public static void main(String args[]) {
for (int i = 1; i < 100; i++) {
System.println(doWork(i));
}
}
As each doWork()
call completes the result is output to the console. I'd like parallelize this - all of the doWork()
calls are independent and don't mutate any shared state. Now, I can do it the old way, messing around with ExecutorSevice
and Future.get()
and so on, but I'd like to do it more cleanly with streams1.
So something like this seems like it could almost work:
public static void main(String args[]) {
IntStream.rangeClosed(1, 100).parallel()
.forEach(i -> System.out.println(doWork(i)));
}
... but the problem is that I want to preserve the output order on the console (the line for doWork(1)
should come first, and so on). I can't use forEachOrdered()
because that serializes the whole operation: only a single thread would be used. The root of the problem is that forEachOrdered
provides too strong a guarantee: that the consumer method is called sequentially on one element at a time. I want the consumers to be called in parallel, but the output to be in-order.
So I should probably look at a map -> collect
type idiom instead, where I collect the output from every doWork()
call into a string and print it once:
public static void main(String[] args) {
System.out.println(IntStream.rangeClosed(1, 100).parallel()
.mapToObj(Main::doWork).collect(Collectors.joining("\n")));
}
Almost! The collect()
method keeps encounter order, so my elements are ordered. The problem now is that there is no incremental output - the whole job has to finished before any output occurs. I really want to preserve the behavior where the updates dribble out onto the console.
I guess I want some kind of ordered consumption terminal operation, that doesn't force the whole pipeline to be ordered. Basically it would collect results internally like a normal collector, but when the current "leftmost" element gets collected, it would pass it through to the consumer - so the consumer sees a stream of ordered elements, but everything is still happening in parallel.
Is there anything out there like? It doesn't seem possible to build it on the existing Collector
interface, since it doesn't give you a way to determine what the order of elements is.
1 ...and perhaps even more efficiently since fork/join is used under the covers, so maybe I get to make use of the some of the heuristics built in to that framework?
You're pretty close. Just combine the map
and forEachOrdered
solutions:
IntStream.rangeClosed(1, 100)
.parallel()
.mapToObj(Main::doWork)
.forEachOrdered(System.out::println);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With