Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Running operations in parallel while preserving in-order, incremental output

I have an slow, CPU intensive operation: doWork(int x), that is called with a single integer parameter with different values, as follows:

static String doWork(int x) {
  // do work that depends on i, takes ~60 seconds
  ...
}

public static void main(String args[]) {
  for (int i = 1; i < 100; i++) {
    System.println(doWork(i));
  }
}

As each doWork() call completes the result is output to the console. I'd like parallelize this - all of the doWork() calls are independent and don't mutate any shared state. Now, I can do it the old way, messing around with ExecutorSevice and Future.get() and so on, but I'd like to do it more cleanly with streams1.

So something like this seems like it could almost work:

public static void main(String args[]) {
    IntStream.rangeClosed(1, 100).parallel()
        .forEach(i -> System.out.println(doWork(i)));
}

... but the problem is that I want to preserve the output order on the console (the line for doWork(1) should come first, and so on). I can't use forEachOrdered() because that serializes the whole operation: only a single thread would be used. The root of the problem is that forEachOrdered provides too strong a guarantee: that the consumer method is called sequentially on one element at a time. I want the consumers to be called in parallel, but the output to be in-order.

So I should probably look at a map -> collect type idiom instead, where I collect the output from every doWork() call into a string and print it once:

public static void main(String[] args) {
    System.out.println(IntStream.rangeClosed(1, 100).parallel()
        .mapToObj(Main::doWork).collect(Collectors.joining("\n")));
}

Almost! The collect() method keeps encounter order, so my elements are ordered. The problem now is that there is no incremental output - the whole job has to finished before any output occurs. I really want to preserve the behavior where the updates dribble out onto the console.

I guess I want some kind of ordered consumption terminal operation, that doesn't force the whole pipeline to be ordered. Basically it would collect results internally like a normal collector, but when the current "leftmost" element gets collected, it would pass it through to the consumer - so the consumer sees a stream of ordered elements, but everything is still happening in parallel.

Is there anything out there like? It doesn't seem possible to build it on the existing Collector interface, since it doesn't give you a way to determine what the order of elements is.


1 ...and perhaps even more efficiently since fork/join is used under the covers, so maybe I get to make use of the some of the heuristics built in to that framework?

like image 904
BeeOnRope Avatar asked Feb 05 '23 12:02

BeeOnRope


1 Answers

You're pretty close. Just combine the map and forEachOrdered solutions:

IntStream.rangeClosed(1, 100)
         .parallel()
         .mapToObj(Main::doWork)
         .forEachOrdered(System.out::println);
like image 127
shmosel Avatar answered Feb 16 '23 03:02

shmosel