Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8: Parallel stream to wait until all the threads finishes the task [closed]

Using parallel stream to large amount of files with large amount of data and write them into a particular format. Here is the code:

public static void main(String[] args) throws Exception {
   mergeController.compactFiles();
   mergeController.writeMergedFlag();
}
private void compactFiles() {
  Set<String> events = this.listSubDirectoryNames(inputDir);
  events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration));
}

These methods don't return anything as they are just writing the files. I am seeing writeMergedFlag() is being called mostly after 1.5 hr of running the process.

What is the issue here? Is it the heap space issue or something else? I haven't encountered this type of issue before.

like image 522
Himanshu Yadav Avatar asked Jul 24 '18 20:07

Himanshu Yadav


People also ask

Does parallel stream forEach wait for completion?

Also when you say this code is that the map is being printed out when the "putting data" process is still going on (cuz it's parallel), this is not true, as forEach is a terminal operation and it will wait to be finished, until it can go an process the next line.

What is the disadvantage of parallel stream in Java 8?

1. Parallel Streams can actually slow you down. Java 8 brings the promise of parallelism as one of the most anticipated new features.

How do you control threads in a parallel stream?

The idea is to create a custom fork-join pool with a desirable number of threads and execute the parallel stream within it. This allows developers to control the threads that parallel stream uses. Additionally, it separates the parallel stream thread pool from the application pool which is considered a good practice.


2 Answers

I reckon it's because a parallel stream utilizes a ForkJoinPool which has a fixed amount of threads. If these writeEvent tasks are small, I'd recommend using a cached thread pool instead:

public static void main(String[] args) throws Exception {
    mergeController.compactFiles();
    mergeController.writeMergedFlag();
}

private void compactFiles() {
    Set<String> events = this.listSubDirectoryNames(inputDir);
    ExecutorService service = Executors.newCachedThreadPool();
    events.forEach(event -> service.execute(() -> writeEvent(event, eventSchemaMap.get(event), configuration)));
    service.shutdown();
    service.awaitTermination(1, TimeUnit.DAYS); // Arbitrary value
}
like image 165
Jacob G. Avatar answered Oct 12 '22 22:10

Jacob G.


All parallelStream() in JVM by default use the same ForkJoinPool.commonPool() which has numberOfCPUs - 1 worker threads. So in your case you first need to check with profiler what consumes time and if there simply a lot of files to process you can use custom thread pool for your parallel stream.

private void compactFiles() throws Exception {
  Set<String> events = this.listSubDirectoryNames(inputDir);
  ForkJoinPool customThreadPool = new ForkJoinPool(4); // you might need to adjust this value to find optimal performance
  customThreadPool.submit(() -> events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration))).get(); //Due to how ForkJoin pool works tasks will be submitted to the same pool which was used to execute parent task
}

http://www.baeldung.com/java-8-parallel-streams-custom-threadpool

like image 42
Ivan Avatar answered Oct 12 '22 21:10

Ivan