By default Java streams are processed by a common thread pool, which is constructed with default parameters. As has been answered in another question, one can adjust these defaults by specifying a custom pool or by setting the java.util.concurrent.ForkJoinPool.common.parallelism
system parameter.
However, I've been unable to increase the number of threads allocated to stream processing by any of these two methods. As an example, consider the program below, which processes a list of IP addresses contained in a file specified in its first argument and outputs the resolved addresses. Running this on a file with about 13000 unique IP addresses, I see using Oracle Java Mission Control as few as 16 threads. Of these, only five are ForkJoinPool
workers. Yet, this particular task would well benefit from many more threads, because threads spend most of the time waiting for DNS responses. So my question is, how can I actually increase the number of threads used?
I've tried the program on three environments; these are the OS-reported number of threads.
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;
/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
/** Resolve the passed IP address into a name */
static String addressName(String ipAddress) {
try {
return InetAddress.getByName(ipAddress).getHostName();
} catch (UnknownHostException e) {
return ipAddress;
}
}
public static void main(String[] args) {
Path path = Paths.get(args[0]);
ForkJoinPool fjp = new ForkJoinPool(100);
try {
fjp.submit(() -> {
try {
Files.lines(path)
.parallel()
.map(line -> addressName(line))
.forEach(System.out::println);
} catch (IOException e) {
System.err.println("Failed: " + e);
}
}).get();
} catch (Exception e) {
System.err.println("Failed: " + e);
}
}
}
There are two problems with your approach. First is that using custom FJP will not change the maximal number of individual tasks created by stream API as this is defined in the following way:
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
So even if you're using custom pool, the number of parallel tasks will be limited by commonPoolParallelism * 4
. (it's actually not hard limit, but a target, but in many cases number of tasks is equal to this number).
The above problem can be fixed by using java.util.concurrent.ForkJoinPool.common.parallelism
system property, but here you hit another problem: Files.lines
is parallelized really badly. See this question for details. In particular, for 13000 input lines the maximal possible speedup is 3.17x (assuming that every line processing takes roughly the same time) even if you have 100 CPUs. My StreamEx library provides a work-around for this (create stream using StreamEx.ofLines(path).parallel()
). Another possible solution is to read file lines sequentially into List
, then create a parallel stream from it:
Files.readAllLines(path).parallelStream()...
This would work along with the system property. However in general Stream API is not well-suited for parallel-processing when tasks involve I/O. More flexible solution is to use CompletableFuture
for each line:
ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
.map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
.collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
.forEach(System.out::println);
This way you don't need to tweak system property and can use separate pools for separate tasks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With