When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?
Tree.java
:
package il.co.roy;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class Tree<T>
{
private final T data;
private final Set<Tree<T>> subTrees;
public Tree(T data, Set<Tree<T>> subTrees)
{
this.data = data;
this.subTrees = subTrees;
}
public Tree(T data)
{
this(data, new HashSet<>());
}
public Tree()
{
this(null);
}
public T getData()
{
return data;
}
public Set<Tree<T>> getSubTrees()
{
return subTrees;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tree<?> tree = (Tree<?>) o;
return Objects.equals(data, tree.data) &&
Objects.equals(subTrees, tree.subTrees);
}
@Override
public int hashCode()
{
return Objects.hash(data, subTrees);
}
@Override
public String toString()
{
return "Tree{" +
"data=" + data +
", subTrees=" + subTrees +
'}';
}
public void sendCommandAll()
{
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
try
{
Thread.sleep(5000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
subTrees.parallelStream()
// .map(Tree::sendCommandAll)
.forEach(Tree::sendCommandAll);
// .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
}
}
(It doesn't matter if I use forEach
or reduce
).
Main.java
:
package il.co.roy;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main
{
public static void main(String... args)
{
System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());
final Tree<Integer> root = new Tree<>(null,
Set.of(new Tree<>(1,
IntStream.range(2, 7)
.boxed()
.map(Tree::new)
.collect(Collectors.toSet()))));
root.sendCommandAll();
// IntStream.generate(() -> 1)
// .parallel()
// .forEach(i ->
// {
// System.out.println(Thread.currentThread().getName());
// try
// {
// Thread.sleep(5000);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// });
}
}
In the main
method I create a tree with the following structure:\
root (data is `null`)
|- 1
|- 2
|- 3
|- 4
|- 5
|- 6
sendCommandAll
function process every sub-tree (in parallel) only if it's parent finishes to be processed.
but the result is as follows:
Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true
(For the record, when I execute the commented code in Main.java
, the JVM uses all 7 (+ 1) threads available commonPool
)
How can I improve my code?
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.
We can actually pass a custom ThreadPool when processing the stream.
Parallel stream is configured to use as many threads as the number of cores in the computer or VM on which the program is running. To illustrate this, consider the above same program run with 15 numbers in the list, which is 15 tasks run in parallel.
The idea is to create a custom fork-join pool with a desirable number of threads and execute the parallel stream within it. This allows developers to control the threads that parallel stream uses. Additionally, it separates the parallel stream thread pool from the application pool which is considered a good practice.
As explained in the second half of this answer, the thread utilization when processing HashMap
s or HashSet
s depends on the distribution of the elements within the backing array, which depends on the hashcodes. Especially with with a small number of elements, compared to the (default) capacity, this may result in bad work splitting.
A simple work-around is using new ArrayList<>(subTrees).parallelStream()
instead of subTrees.parallelStream()
.
But note that your method performs the actual work of the current node (in the example simulated with a sleep
) before processing the children which also reduces the potential parallelism.
You may use
public void sendCommandAll() {
if(subTrees.isEmpty()) {
actualSendCommand();
return;
}
List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
tmp.addAll(subTrees);
tmp.add(this);
tmp.parallelStream().forEach(t -> {
if(t != this) t.sendCommandAll(); else t.actualSendCommand();
});
}
private void actualSendCommand() {
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] sending command to " + data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (data != null)
System.out.println("[" + Thread.currentThread().getName()
+ "] tree with data " + data + " got " + true);
}
This allows to process the current node concurrently to the processing of the children.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With