Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why ParallelStream won't use all commonPool's thread in recursion?

When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?

Tree.java:

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{" +
                "data=" + data +
                ", subTrees=" + subTrees +
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}

(It doesn't matter if I use forEach or reduce).

Main.java:

package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}

In the main method I create a tree with the following structure:\

root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6

sendCommandAll function process every sub-tree (in parallel) only if it's parent finishes to be processed. but the result is as follows:

Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true

(For the record, when I execute the commented code in Main.java, the JVM uses all 7 (+ 1) threads available commonPool)

How can I improve my code?

like image 675
Roy Ash Avatar asked Oct 13 '21 12:10

Roy Ash


People also ask

How many threads will Parallelstream use?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

Can we pass a custom thread executor while calling the Parallelstream () method on a collection?

We can actually pass a custom ThreadPool when processing the stream.

Does parallel stream use thread?

Parallel stream is configured to use as many threads as the number of cores in the computer or VM on which the program is running. To illustrate this, consider the above same program run with 15 numbers in the list, which is 15 tasks run in parallel.

How do you control threads in a parallel stream?

The idea is to create a custom fork-join pool with a desirable number of threads and execute the parallel stream within it. This allows developers to control the threads that parallel stream uses. Additionally, it separates the parallel stream thread pool from the application pool which is considered a good practice.


1 Answers

As explained in the second half of this answer, the thread utilization when processing HashMaps or HashSets depends on the distribution of the elements within the backing array, which depends on the hashcodes. Especially with with a small number of elements, compared to the (default) capacity, this may result in bad work splitting.

A simple work-around is using new ArrayList<>(subTrees).parallelStream() instead of subTrees.parallelStream().

But note that your method performs the actual work of the current node (in the example simulated with a sleep) before processing the children which also reduces the potential parallelism.

You may use

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] sending command to " + data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] tree with data " + data + " got " + true);
}

This allows to process the current node concurrently to the processing of the children.

like image 181
Holger Avatar answered Oct 06 '22 01:10

Holger