Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Nested Java 8 parallel forEach loop perform poor. Is this behavior expected?

Note: I already addressed this problem in another SO post - Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug? -, but the title of this post suggested that the problem is related to the use of a semaphore - which somewhat distracted the discussion. I am creating this one to stress that nested loops might have a performance issue - although both problems have likely a common cause (and maybe because it took me a lot of time to figure out this problem). (I don't see it as a duplicate, because it is stressing another symptom - but if you do just delete it).

Problem: If you nest two Java 8 stream.parallel().forEach loops and all tasks are independent, stateless, etc. - except for being submitted to the common FJ pool -, then nesting a parallel loop inside a parallel loop performs much poorer than nesting a sequential loop inside a parallel loop. Even worse: If the operation containing the inner loop is synchronized, your will get a DEADLOCK.

Demonstration of the performance issue

Without the 'synchronized' you can still observe a performance problem. You find a demo code for this at: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (see the JavaDoc there for a more detailed description).

Our setup here is as follows: We have a nested stream.parallel().forEach().

  • The inner loop is independent (stateless, no interference, etc. - except of the use of a common pool) and consumes 1 second in total in the worst case, namely if processed sequential.
  • Half of the tasks of the outer loop consume 10 seconds prior that loop.
  • Half consume 10 seconds after that loop.
  • Hence every thread consumes 11 seconds (worst case) in total.  * We have a boolean which allows to switch the inner loop from parallel() to sequential().

Now: submitting 24 outer-loop-tasks to a pool with parallelism 8 we would expect 24/8 * 11 = 33 seconds at best (on an 8 core or better machine).

The result is:

  • With inner sequential loop: 33 seconds.
  • With inner parallel loop: >80 seconds (I had 92 seconds).

Question: Can you confirm this behavior? Is this something one would expect from the framework? (I am a bit more careful now with a claim that this is a bug, but I personally believe that it is due to a bug in the implementation of ForkJoinTask. Remark: I have posted this to concurrency-interest (see http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), but so far I did not get confirmation from there).

Demonstration of the the deadlock

The following code will DEADLOCK

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

where numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000 and doWork is some stateless CPU burner.

You find a complete demo code at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (see the JavaDoc there for a more detailed description).

Is this behavior expected? Note that the documentation on Java parallel streams does not mention any issue with nesting or synchronization. Also, the fact that both use a common fork-join-pool is not mentioned.

Update

Another test on the performance issue can be found at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java - this test come without any blocking operation (no Thread.sleep and not synchronized). I compiled some more remarks here: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Update 2

It appears as if this problem and the more severe DEADLOCK with semaphores has been fixed in Java8 u40.

like image 978
Christian Fries Avatar asked May 06 '14 08:05

Christian Fries


People also ask

What is the disadvantage of parallel stream in Java 8?

Parallel Streams can actually slow you down It breaks them into subproblems which then run on separate threads for processing, these can go to different cores and then get combined when they're done. This all happens under the hood using the fork/join framework.

Does ForEach execute in parallel?

ForEach will always execute in parallel.

Is parallel stream faster than for loop?

If you have a small list, loops perform better. If you have a huge list, a parallel stream will perform better. Purely thinking in terms of performance, you shouldn't use a for-each loop with an ArrayList, as it creates an extra Iterator instance that you don't need (for LinkedList it's a different matter).


3 Answers

The problem is that the rather limited parallelism you have configured is eaten up by the outer stream processing: if you say that you want eight threads and process a stream of more than eight items with parallel() it will create eight worker threads and let them process items.

Then within your consumer you are processing another stream using parallel() but there are no worker threads left. Since the worker threads are blocked waiting for the end of the of the inner stream processing, the ForkJoinPool has to create new worker threads which violate your configured parallelism. It seems to me that it does not recycle these extend threads but let them die right after processing. So within your inner processing, new threads are created and disposed which is an expensive operation.

You might see it as a flaw that the initiating threads do not contribute to the computation of a parallel stream processing but just wait for the result but even if that was fixed you still have a general problem that is hard (if ever) to fix:

Whenever the ratio between the number of worker threads to outer stream items is low, the implementation will use them all for the outer stream as it doesn’t know that the stream is an outer stream. So executing an inner stream in parallel requests more worker threads than available. Using the caller thread for contributing to the computation could fix it in a way that the performance equals the serial computation but getting an advantage of parallel execution here does not work well with the concept of a fixed number of worker threads.

Note that you are scratching on the surface of this problem here, as you have rather balanced processing times for the items. If the processing of both, inner items and outer items, diverge (compared to items on the same level), the problem will be even worse.


Update: by profiling and looking at the code it seems that the ForkJoinPool does attempts to use the waiting thread for “work stealing” but using different code depending on the fact whether the Thread is a worker thread or some other thread. As a result, a worker thread is actually waiting about 80% of the time and doing very little to no work while other threads really contribute to the computation…


Update 2: for completeness, here the simple parallel execution approach as described in the comments. Since it enqueues every item it is expected to have to much overhead when the execution time for a single item is rather small. So it’s not a sophisticated solution but rather a demonstration that it is possible to handle long running tasks without much magic…

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}
like image 83
Holger Avatar answered Oct 05 '22 06:10

Holger


I can confirm this is still a performance issue in 8u72, although it will no longer deadlock. Parallel terminal operations are still done with ForkJoinTask instances outside of a ForkJoinPool context, which means every parallel stream still shares the common pool.

To demonstrate a simple pathological case:

import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class ParallelPerf {

    private static final Object LOCK = new Object();

    private static void runInNewPool(Runnable task) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            pool.submit(task).join();
        } finally {
            pool.shutdown();
        }
    }

    private static <T> T runInNewPool(Callable<T> task) {
        ForkJoinPool pool = new ForkJoinPool();
        try {
            return pool.submit(task).join();
        } finally {
            pool.shutdown();
        }
    }

    private static void innerLoop() {
        IntStream.range(0, 32).parallel().forEach(i -> {
//          System.out.println(Thread.currentThread().getName());
            try {
                Thread.sleep(5);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    public static void main(String[] args) {
        System.out.println("==DEFAULT==");
        long startTime = System.nanoTime();
        IntStream.range(0, 32).parallel().forEach(i -> {
            synchronized (LOCK) {
                innerLoop();
            }
//          System.out.println(" outer: " + Thread.currentThread().getName());
        });
        System.out.println(System.nanoTime() - startTime);

        System.out.println("==NEW POOLS==");
        startTime = System.nanoTime();
        IntStream.range(0, 32).parallel().forEach(i -> {
            synchronized (LOCK) {
                runInNewPool(() -> innerLoop());
            }
//          System.out.println(" outer: " + Thread.currentThread().getName());
        });
        System.out.println(System.nanoTime() - startTime);
    }
}

The second run passes innerLoop to runInNewPool instead of calling it directly. On my machine (i7-4790, 8 CPU threads), I get about a 4x speed-up:

==DEFAULT==
4321223964
==NEW POOLS==
1015314802

Uncommenting the other print statements makes the problem obvious:

[...]
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
 outer: ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
[...]
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
 outer: ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-4
[...]

The common pool worker threads pile up at the synchronized block, with only one thread able to enter at a time. Since the inner parallel operation uses the same pool, and all the other threads in the pool are waiting for the lock, we get single-threaded execution.

And the result of using separate ForkJoinPool instances:

[...]
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-6
ForkJoinPool-1-worker-5
 outer: ForkJoinPool.commonPool-worker-4
ForkJoinPool-2-worker-1
ForkJoinPool-2-worker-5
[...]
ForkJoinPool-2-worker-7
ForkJoinPool-2-worker-3
 outer: ForkJoinPool.commonPool-worker-1
ForkJoinPool-3-worker-2
ForkJoinPool-3-worker-5
[...]

We still have the inner loop running on one worker thread at a time, but the inner parallel operation gets a fresh pool each time and can utilize all of its worker threads.

This is a contrived example, but removing the synchronized blocks still shows a similar difference in speed, since the inner and outer loops are still competing over the same worker threads. Multithreaded applications need to be careful when using parallel streams in multiple threads, as this could result in random slowdown when they overlap.

This is an issue with all terminal operations, not just forEach, since they all run tasks in the common pool. I'm using the runInNewPool methods above as a workaround, but hopefully this will be built into the standard library at some point.

like image 35
Sean Van Gorder Avatar answered Oct 05 '22 06:10

Sean Van Gorder


After tidying the code a little. I don't see the same results with Java 8 update 45. There is undoubtedly an overhead but it is very small compared to the time spans you are talking about.

The potential for a deadlock is expected as you are consuming all the available threads in the pool with the outer loop, leaving you no threads left to execute the inner loop.

The following program prints

isInnerStreamParallel: false, isCPUTimeBurned: false
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 33.1 seconds.
isInnerStreamParallel: false, isCPUTimeBurned: true
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 33.0 seconds.
isInnerStreamParallel: true, isCPUTimeBurned: false
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 32.5 seconds.
isInnerStreamParallel: true, isCPUTimeBurned: true
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 32.6 seconds.

The code

import java.util.stream.IntStream;

public class NestedParallelForEachTest {
    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100;                // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams    = 8;    // java.util.concurrent.ForkJoinPool.common.parallelism

    public static void main(String[] args) {
        testNestedLoops(false, false);
        testNestedLoops(false, true);
        testNestedLoops(true, false);
        testNestedLoops(true, true);
    }

    public static void testNestedLoops(boolean isInnerStreamParallel, boolean isCPUTimeBurned) {
        System.out.println("isInnerStreamParallel: " + isInnerStreamParallel + ", isCPUTimeBurned: " + isCPUTimeBurned);
        long start = System.nanoTime();

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        // Outer loop
        IntStream.range(0, numberOfTasksInOuterLoop).parallel().forEach(i -> {
//            System.out.println(i + "\t" + Thread.currentThread());
            if(i < 10) burnTime(10 * 1000, isCPUTimeBurned);

            IntStream range = IntStream.range(0, numberOfTasksInInnerLoop);
            if (isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                range = range.parallel();
            } else {
                // Inner loop as sequential
            }
            range.forEach(j -> burnTime(10, isCPUTimeBurned));

            if(i >= 10) burnTime(10 * 1000, isCPUTimeBurned);
        });

        long end = System.nanoTime();

        System.out.printf("Done in %.1f seconds.%n", (end - start) / 1e9);
    }

    static void burnTime(long millis, boolean isCPUTimeBurned) {
        if (isCPUTimeBurned) {
            long end = System.nanoTime() + millis * 1000000;
            while (System.nanoTime() < end)
                ;

        } else {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                throw new AssertionError(e);
            }
        }
    }
}
like image 21
Peter Lawrey Avatar answered Oct 05 '22 06:10

Peter Lawrey