Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does parallelStream not use the entire available parallelism?

I have a custom ForkJoinPool created with parallelism of 25.

customForkJoinPool = new ForkJoinPool(25);

I have a list of 700 file names and I used code like this to download the files from S3 in parallel and cast them to Java objects:

customForkJoinPool.submit(() -> {
   return fileNames
     .parallelStream()
     .map((fileName) -> {
        Logger log = Logger.getLogger("ForkJoinTest");
        long startTime = System.currentTimeMillis();
        log.info("Starting job at Thread:" + Thread.currentThread().getName());
        MyObject obj = readObjectFromS3(fileName);
        long endTime = System.currentTimeMillis();
        log.info("completed a job with Latency:" + (endTime - startTime));
        return obj;
     })
     .collect(Collectors.toList);
   });
});

When I look at the logs, I see only 5 threads being used. With a parallelism of 25, I expected this to use 25 threads. The average latency to download and convert the file to an object is around 200ms. What am I missing?

May be a better question is how does a parallelstream figure how much to split the original list before creating threads for it? In this case, it looks like it decided to split it 5 times and stop.

like image 401
Aishwar Avatar asked Jun 12 '15 22:06

Aishwar


2 Answers

Why are you doing this with ForkJoinPool? It's meant for CPU-bound tasks with subtasks that are too fast to warrant individual scheduling. Your workload is IO-bound and with 200ms latency the individual scheduling overhead is negligible.

Use an Executor:

import static java.util.stream.Collectors.toList;
import static java.util.concurrent.CompletableFuture.supplyAsync;

ExecutorService threads = Executors.newFixedThreadPool(25);

List<MyObject> result = fileNames.stream()
        .map(fn -> supplyAsync(() -> readObjectFromS3(fn), threads))
        .collect(toList()).stream()
        .map(CompletableFuture::join)
        .collect(toList());
like image 154
Misha Avatar answered Nov 15 '22 16:11

Misha


I think that the answer is in this ... from the ForkJoinPool javadoc.

"The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization."

In your case, the downloads will be performing blocking I/O operations.

like image 40
Stephen C Avatar answered Nov 15 '22 17:11

Stephen C