Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java parallelStream does not use expected number of threads

Java 8 parallelStream seems to use more threads than the ones specified by the system property java.util.concurrent.ForkJoinPool.common.parallelism. These unit tests show that I process tasks using the desired number of threads using my own ForkJoinPool but when using parallelStream the number of threads is higher than expected.

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;

public class ParallelStreamTest {

    private static final int TOTAL_TASKS = 1000;

    @Test
    public void testParallelStreamWithParallelism1() throws InterruptedException {
        final Integer maxThreads = 1;
        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", maxThreads.toString());
        List<Integer> objects = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            objects.add(i);
        }

        final AtomicInteger concurrentThreads = new AtomicInteger(0);
        final AtomicInteger taskCount = new AtomicInteger(0);

        objects.parallelStream().forEach(i -> {
            processTask(concurrentThreads, maxThreads); //expected to be called one at the time
            taskCount.addAndGet(1);
        });

        assertTrue(taskCount.get() == TOTAL_TASKS);
    }

    @Test
    public void testMyOwnForkJoinPoolWithParallelism1() throws InterruptedException {
        final Integer threads = 1;
        List<Integer> objects = new ArrayList<>();
        for (int i = 0; i < TOTAL_TASKS; i++) {
            objects.add(i);
        }

        ForkJoinPool forkJoinPool = new ForkJoinPool(1);
        final AtomicInteger concurrentThreads = new AtomicInteger(0);
        final AtomicInteger taskCount = new AtomicInteger(0);

        forkJoinPool.submit(() -> objects.parallelStream().forEach(i -> {
            processTask(concurrentThreads, threads); //expected to be called one at the time
            taskCount.addAndGet(1);
        }));
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.MINUTES);

        assertTrue(taskCount.get() == TOTAL_TASKS);
    }

    /**
     * It simply processes a task increasing first the concurrentThreads count
     *
     * @param concurrentThreads Counter for threads processing tasks
     * @param maxThreads Maximum number of threads that are expected to be used for processing tasks
     */
    private void processTask(AtomicInteger concurrentThreads, int maxThreads) {
        int currentConcurrentThreads = concurrentThreads.addAndGet(1);
        if (currentConcurrentThreads > maxThreads) {
            throw new IllegalStateException("There should be no more than " + maxThreads + " concurrent thread(s) but found " + currentConcurrentThreads);
        }

        // actual processing would go here

        concurrentThreads.decrementAndGet();
    }
}

There should be only one thread used for processing tasks as the ForkJoinPool has parallelism=1 and java.util.concurrent.ForkJoinPool.common.parallelism=1. Therefore both tests should pass but testParallelStreamWithParallelism1 fails with:

java.lang.IllegalStateException: There should be no more than 1 concurrent thread(s) but found 2

It seems that setting java.util.concurrent.ForkJoinPool.common.parallelism=1 is not working as expected and more than 1 concurrent task is processed simultaneously.

Any ideas?

like image 330
jenarros Avatar asked Mar 10 '23 18:03

jenarros


2 Answers

The parallelism setting of the Fork/Join pool determines the number of pool worker threads, but since the caller thread, e.g. the main thread, will work on the jobs too, there is always one more thread when using the common pool. That’s why the default setting of the common pool is “number of cores minus one” to get an actual number of working threads equal to the number of cores.

With your custom Fork/Join pool, the caller thread of the stream operation is already a worker thread of the pool, hence, utilizing it for processing jobs doesn’t increase the actual number of working threads.

It must be emphasized that the interaction between the Stream implementation and the Fork/Join pool is entirely unspecified as the fact that streams use the Fork/Join framework under the hood is an implementation detail. There is no guaranty that changing the default pool’s properties has any effect on streams nor that calling stream operations from within a custom Fork/Join pool’s task will use that custom pool.

like image 106
Holger Avatar answered Mar 13 '23 00:03

Holger


Set this parameter as well:

    System.setProperty("java.util.concurrent.ForkJoinPool.common.maximumSpares", "0");

This worked for me. Apparently (although not very well documented), there are allowed 'Spare' threads to pick up work from default ForkJoinPool.

like image 21
Renats Stozkovs Avatar answered Mar 13 '23 01:03

Renats Stozkovs