Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug?

Tags:

Consider the following situation: We are using a Java 8 parallel stream to perform a parallel forEach loop, e.g.,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */}) 

The number of parallel threads is controlled by the system property "java.util.concurrent.ForkJoinPool.common.parallelism" and usually equal to the number of processors.

Now assume that we like to limit the number of parallel executions for a specific piece of work - e.g. because that part is memory intensive and memory constrain imply a limit of parallel executions.

An obvious and elegant way to limit parallel executions is to use a Semaphore (suggested here), e.g., the following pice of code limits the number of parallel executions to 5:

        final Semaphore concurrentExecutions = new Semaphore(5);         IntStream.range(0,20).parallel().forEach(i -> {              concurrentExecutions.acquireUninterruptibly();              try {                 /* WORK DONE HERE */             }             finally {                 concurrentExecutions.release();             }         }); 

This works just fine!

However: Using any other parallel stream inside the worker (at /* WORK DONE HERE */) may result in a deadlock.

For me this is an unexpected behavior.

Explanation: Since Java streams use a ForkJoin pool, the inner forEach is forking, and the join appears to be waiting for ever. However, this behavior is still unexpected. Note that parallel streams even work if you set "java.util.concurrent.ForkJoinPool.common.parallelism" to 1.

Note also that it may not be transparent if there is an inner parallel forEach.

Question: Is this behavior in accordance with the Java 8 specification (in that case it would imply that the use of Semaphores inside parallel streams workers is forbidden) or is this a bug?

For convenience: Below is a complete test case. Any combinations of the two booleans work, except "true, true", which results in the deadlock.

Clarification: To make the point clear, let me stress one aspect: The deadlock does not occur at the acquire of the semaphore. Note that the code consists of

  1. acquire semaphore
  2. run some code
  3. release semaphore

and the deadlock occurs at 2. if that piece of code is using ANOTHER parallel stream. Then the deadlock occurs inside that OTHER stream. As a consequence it appears that it is not allowed to use nested parallel streams and blocking operations (like a semaphore) together!

Note that it is documented that parallel streams use a ForkJoinPool and that ForkJoinPool and Semaphore belong to the same package - java.util.concurrent (so one would expect that they interoperate nicely).

/*  * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].  *  * Created on 03.05.2014  */ package net.finmath.experiments.concurrency;  import java.util.concurrent.Semaphore; import java.util.stream.IntStream;  /**  * This is a test of Java 8 parallel streams.  *   * The idea behind this code is that the Semaphore concurrentExecutions  * should limit the parallel executions of the outer forEach (which is an  * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:  * the parallel executions of the outer forEach should be limited due to a  * memory constrain).  *   * Inside the execution block of the outer forEach we use another parallel stream  * to create an inner forEach. The number of concurrent  * executions of the inner forEach is not limited by us (it is however limited by a  * system property "java.util.concurrent.ForkJoinPool.common.parallelism").  *   * Problem: If the semaphore is used AND the inner forEach is active, then  * the execution will be DEADLOCKED.  *   * Note: A practical application is the implementation of the parallel  * LevenbergMarquardt optimizer in  * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}  * In one application the number of tasks in the outer and inner loop is very large (>1000)  * and due to memory limitation the outer loop should be limited to a small (5) number  * of concurrent executions.  *   * @author Christian Fries  */ public class ForkJoinPoolTest {      public static void main(String[] args) {          // Any combination of the booleans works, except (true,true)         final boolean isUseSemaphore    = true;         final boolean isUseInnerStream  = true;          final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).         final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).         final int       concurrentExecusionsLimitInOuterLoop = 5;         final int       concurrentExecutionsLimitForStreams = 10;          final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);          System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));         System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));          IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {              if(isUseSemaphore) {                 concurrentExecutions.acquireUninterruptibly();             }              try {                 System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());                  if(isUseInnerStream) {                     runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);                 }                 else {                     try {                         Thread.sleep(10*numberOfTasksInInnerLoop);                     } catch (Exception e) {                     }                 }             }             finally {                 if(isUseSemaphore) {                     concurrentExecutions.release();                 }             }         });          System.out.println("D O N E");     }      /**      * Runs code in a parallel forEach using streams.      *       * @param numberOfTasksInInnerLoop Number of tasks to execute.      */     private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {         IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {             try {                 Thread.sleep(10);             } catch (Exception e) {             }         });     } } 
like image 633
Christian Fries Avatar asked May 03 '14 08:05

Christian Fries


People also ask

How can we control parallel processing of a stream in Java 8?

To solve this issue, you can create own thread pool while processing the stream. ForkJoinPool fjp = new ForkJoinPool(parallelism); This will create ForkJoinPool with target parallelism level. If you don't pass parallelism, it will equal to the number of processors by default.

Is Java 8 support parallel and sequential streams?

Parallel streams divide the provided task into many and run them in different threads, utilizing multiple cores of the computer. On the other hand sequential streams work just like for-loop using a single core.

Do Java streams run in parallel?

You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.

What is the thread pool used by Java parallel stream?

Parallel Stream The default processing that occurs in such a Stream uses the ForkJoinPool. commonPool(), a thread pool shared by the entire application.


2 Answers

Any time you are decomposing a problem into tasks, where those tasks could be blocked on other tasks, and try and execute them in a finite thread pool, you are at risk for pool-induced deadlock. See Java Concurrency in Practice 8.1.

This is unquestionably a bug -- in your code. You're filling up the FJ pool with tasks that are going to block waiting for the results of other tasks in the same pool. Sometimes you get lucky and things manage to not deadlock (just like not all lock-ordering errors result in deadlock all the time), but fundamentally you're skating on some very thin ice here.

like image 153
Brian Goetz Avatar answered Oct 21 '22 16:10

Brian Goetz


I ran your test in a profiler (VisualVM) and I agree: Threads are waiting for the semaphore and on aWaitJoin() in the F/J Pool.

This framework has serious problems where join() is concerned. I’ve been writing a critique about this framework for four years now. The basic join problem starts here.

aWaitJoin() has similar problems. You can peruse the code yourself. When the framework gets to the bottom of the work deque it issues a wait(). What it all comes down to is this framework has no way of doing a context-switch.

There is a way of getting this framework to create compensation threads for the threads that are stalled. You need to implement the ForkJoinPool.ManagedBlocker interface. How you can do this, I have no idea. You’re running a basic API with streams. You’re not implementing the Streams API and writing your own code.

I stick to my comment, above: Once you turn over the parallelism to the API you relinquish your ability to control the inner workings of that parallel mechanism. There is no bug with the API (other than it is using a faulty framework for parallel operations.) The problem is that semaphores or any other method for controlling parallelism within the API are hazardous ideas.

like image 43
edharned Avatar answered Oct 21 '22 15:10

edharned