Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java parallel stream: how to wait for threads for a parallel stream to finish?

So I have a list from which I obtain a parallel stream to fill out a map, as follows:

Map<Integer, TreeNode> map = new HashMap<>();
List<NodeData> list = some_filled_list;

//Putting data from the list into the map
list.parallelStream().forEach(d -> {
                TreeNode node = new TreeNode(d);
                map.put(node.getId(), node);
            });

//print out map
map.entrySet().stream().forEach(entry -> {
     System.out.println("Processing node with ID = " + entry.getValue().getId());
                });

The problem with this code is that the map is being printed out when the "putting data" process is still going on (cuz it's parallel), so as a result, map has not yet received all the elements from the list yet. Of course, in my real code, it is not just printing out the map; I use a map to take advantage of O(1) lookup time.

My question is:

  1. how to make the main thread wait so that the "putting data" is finished before the map is printed out? I tried to put the "putting data" inside a thread t, and do t.start() and t.join(), but that doesn't help.

  2. Maybe I am not supposed to use parallel stream in this case? The list is long, and I just want to take advantage of the parallelism to improve efficiency.

like image 215
Simo Avatar asked Jan 12 '18 06:01

Simo


People also ask

Does parallel stream forEach wait for completion?

Also when you say this code is that the map is being printed out when the "putting data" process is still going on (cuz it's parallel), this is not true, as forEach is a terminal operation and it will wait to be finished, until it can go an process the next line.

How do you control threads in a parallel stream?

The idea is to create a custom fork-join pool with a desirable number of threads and execute the parallel stream within it. This allows developers to control the threads that parallel stream uses. Additionally, it separates the parallel stream thread pool from the application pool which is considered a good practice.

How many threads will be used when we use parallel stream ()?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

Does parallel stream use thread pool?

2. Parallel Stream. The default processing that occurs in such a Stream uses the ForkJoinPool. commonPool(), a thread pool shared by the entire application.


3 Answers

With this list.parallelStream().forEach you are violating the side-effects property that is explicitly stated in the Stream documentation.

Also when you say this code is that the map is being printed out when the "putting data" process is still going on (cuz it's parallel), this is not true, as forEach is a terminal operation and it will wait to be finished, until it can go an process the next line. You might be seeing that as such, since you are collecting to a non thread-safe HashMap and some entries might not be in that map... Think about about other way, what would happen if you would put multiple entries from multiple threads in a HashMap? Well, lots of things can break, like missing entries, on incorrect/inconsistent Map, etc.

Of course, changing that to a ConcurrentHashMap would work, since it's thread-safe, but you are still violating the side-effect property, although in a "safe" way.

The correct thing to do is to collect to a Map directly without forEach:

Map<Integer, TreeNode> map = list.parallelStream()
        .collect(Collectors.toMap(
                NodeData::getId,
                TreeNode::new
        ));

This way, even for parallel processing, everything would be fine. Just notice that you would need lots (tens of thousands elements) to have any measurable performance increase from parallel processing.

like image 156
Eugene Avatar answered Sep 19 '22 08:09

Eugene


Stream operations will block until done for both - parallel and not parallel implementations.

So what you see is not the "putting data" process is still going on - most likely it's just data corruption, since HashMap is not threadsafe. Try using ConcurrentHashMap instead.

like image 37
Stadnyk Oleksii Avatar answered Sep 20 '22 08:09

Stadnyk Oleksii


I would guess that if it is possible for the stream to still be processing you could try something like:

    List<NodeData> list = new ArrayList<>();

    //Putting data from the list into the map
    Map<Integer, TreeNode> map = list.parallelStream()
            .collect(Collectors.toMap(
                    n -> n.getId(),
                    n -> new TreeNode(n)
            ));

At least now you have a terminal on the stream. You will use multiple threads possible and the mapping is certainly going to be complete.

like image 22
OldCurmudgeon Avatar answered Sep 22 '22 08:09

OldCurmudgeon