Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Java 8, does Executors.newWorkStealingPool() also provide a task queue?

Is there a queue of pending tasks used in conjunction with Java 8's Executors.newWorkStealingPool()?

For example, suppose the # available cores is 2, and Executors.newWorkStealingPool() is empty because 2 tasks are already running. Then what happens if a 3rd task is submitted to the work-stealing executor? Is it queued? And if it is, what are the bounds if any on said queue?

Thanks in advance.

like image 581
Kode Charlie Avatar asked Feb 24 '16 00:02

Kode Charlie


People also ask

What is executor in Java 8?

Interface ExecutorAn object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.

What is newWorkStealingPool in Java?

The newWorkStealingPool() method of Executors class creates a work-stealing thread pool using the number of available processors as its target parallelism level.

What is task executor in Java?

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks. An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService .

How does executor framework work in Java?

The Executor Framework contains a bunch of components that are used to efficiently manage multiple threads. It was released with the JDK 5 which is used to run the Runnable objects without creating new threads every time and also mostly re-using the already created threads.


2 Answers

Is there a queue of pending tasks used in conjunction with Java 8's Executors.newWorkStealingPool()?

Yes, every thread is backed with it's own deque. When one thread is done with it's tasks it takes task from other thread's deque and executes it.

And if it is, what are the bounds if any on said queue?

Maximum size for the queues is limited by the number: static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

When the queue is full an unchecked exception is thrown: RejectedExecutionException("Queue capacity exceeded")

like image 51
Anton Antonenko Avatar answered Sep 27 '22 19:09

Anton Antonenko


From grepcode of Executors and ForkJoinPool

Executors.newWorkStealingPool returns ForkJoinPool

Executors:

 public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

ForkJoinPool:

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

On execute() :

public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }

externalPush calls externalSubmit and you can see WorkQueue details in that implementation.

externalSubmit:

// External operations

/**
 * Full version of externalPush, handling uncommon cases, as well
 * as performing secondary initialization upon the first
 * submission of the first task to the pool.  It also detects
 * first submission by an external thread and creates a new shared
 * queue if the one at index if empty or contended.
 *
 * @param task the task. Caller must ensure non-null.

 */

You can find more details about queue sizes in WorkQueue class

 static final class WorkQueue {

Documentation on WokrQueue:

/**
     * Queues supporting work-stealing as well as external task
     * submission. See above for descriptions and algorithms.
     * Performance on most platforms is very sensitive to placement of
     * instances of both WorkQueues and their arrays -- we absolutely
     * do not want multiple WorkQueue instances or multiple queue
     * arrays sharing cache lines. The @Contended annotation alerts
     * JVMs to try to keep instances apart.
     */
    @sun.misc.Contended

 /**
     * Capacity of work-stealing queue array upon initialization.
     * Must be a power of two; at least 4, but should be larger to
     * reduce or eliminate cacheline sharing among queues.
     * Currently, it is much larger, as a partial workaround for
     * the fact that JVMs often place arrays in locations that
     * share GC bookkeeping (especially cardmarks) such that
     * per-write accesses encounter serious memory contention.
     */
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    /**
     * Maximum size for queue arrays. Must be a power of two less
     * than or equal to 1 << (31 - width of array entry) to ensure
     * lack of wraparound of index calculations, but defined to a
     * value a bit less than this to help users trap runaway
     * programs before saturating systems.
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
like image 36
Ravindra babu Avatar answered Sep 27 '22 21:09

Ravindra babu