Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I process a Java stream with more than the default number of threads?

By default Java streams are processed by a common thread pool, which is constructed with default parameters. As has been answered in another question, one can adjust these defaults by specifying a custom pool or by setting the java.util.concurrent.ForkJoinPool.common.parallelism system parameter.

However, I've been unable to increase the number of threads allocated to stream processing by any of these two methods. As an example, consider the program below, which processes a list of IP addresses contained in a file specified in its first argument and outputs the resolved addresses. Running this on a file with about 13000 unique IP addresses, I see using Oracle Java Mission Control as few as 16 threads. Of these, only five are ForkJoinPool workers. Yet, this particular task would well benefit from many more threads, because threads spend most of the time waiting for DNS responses. So my question is, how can I actually increase the number of threads used?

I've tried the program on three environments; these are the OS-reported number of threads.

  • Java SE Runtime Environment build 1.8.0_73-b02 on an 8-core machine running Windows 7: 17 threads
  • Java SE Runtime Environment build 1.8.0_66-b17 on a 2-core machine running OS X Darwin 15.2.0: 23 threads
  • openjdk version 1.8.0_72 on a 24-core machine running FreeBSD 11.0: 44 threads

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ForkJoinPool;

/** Resolve IP addresses in file args[0] using 100 threads */
public class Resolve100 {
    /** Resolve the passed IP address into a name */
    static String addressName(String ipAddress) {
        try {
            return InetAddress.getByName(ipAddress).getHostName();
        } catch (UnknownHostException e) {
            return ipAddress;
        }
    }

    public static void main(String[] args) {
        Path path = Paths.get(args[0]);
        ForkJoinPool fjp = new ForkJoinPool(100);
        try {
            fjp.submit(() -> {
                try {
                    Files.lines(path)
                    .parallel()
                    .map(line -> addressName(line))
                    .forEach(System.out::println);
                } catch (IOException e) {
                    System.err.println("Failed: " + e);
                }
            }).get();
        } catch (Exception e) {
            System.err.println("Failed: " + e);
        }
    }
}
like image 411
Diomidis Spinellis Avatar asked Feb 23 '16 15:02

Diomidis Spinellis


1 Answers

There are two problems with your approach. First is that using custom FJP will not change the maximal number of individual tasks created by stream API as this is defined in the following way:

static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

So even if you're using custom pool, the number of parallel tasks will be limited by commonPoolParallelism * 4. (it's actually not hard limit, but a target, but in many cases number of tasks is equal to this number).

The above problem can be fixed by using java.util.concurrent.ForkJoinPool.common.parallelism system property, but here you hit another problem: Files.lines is parallelized really badly. See this question for details. In particular, for 13000 input lines the maximal possible speedup is 3.17x (assuming that every line processing takes roughly the same time) even if you have 100 CPUs. My StreamEx library provides a work-around for this (create stream using StreamEx.ofLines(path).parallel()). Another possible solution is to read file lines sequentially into List, then create a parallel stream from it:

Files.readAllLines(path).parallelStream()...

This would work along with the system property. However in general Stream API is not well-suited for parallel-processing when tasks involve I/O. More flexible solution is to use CompletableFuture for each line:

ForkJoinPool fjp = new ForkJoinPool(100);
List<CompletableFuture<String>> list = Files.lines(path)
    .map(line -> CompletableFuture.supplyAsync(() -> addressName(line), fjp))
    .collect(Collectors.toList());
list.stream().map(CompletableFuture::join)
    .forEach(System.out::println);

This way you don't need to tweak system property and can use separate pools for separate tasks.

like image 188
Tagir Valeev Avatar answered Oct 21 '22 17:10

Tagir Valeev