Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java ForkJoinPool - order of tasks in queues

I would like to understand the order in which tasks are processed in Java fork-join pool.

So far, the only relevant information I've found in the docs is about a parameter called the "asyncMode", which is "true if this pool uses local first-in-first-out scheduling mode for forked tasks that are never joined".

My interpretation of this statement is that every worker has its own task queue; workers take tasks from the front of their own queue, or steal of the backs of other workers' queues if their own queues are empty; workers add newly-forked tasks to the back (resp. front) of their own queues if asyncMode is true (resp. false).

Please correct me if my interpretation is wrong!

Now, this raises a couple of questions:

1) What is the ordering for forked tasks that are joined?

My guess is that, when a task is forked, it is added to the worker's queue as described in my interpretation above. Now, suppose the task is joined...

  • If, when join is called, the task has not yet been started, the worker calling join will pull the task out of the queue and start working on it immediately.

  • If, when join is called, the task has already been stolen by another worker, then the worker calling join will work on other tasks in the meantime (following the ordering for getting tasks described in my interpretation above), until the task that it is joining has been finished by the worker that stole it.

This guess is based on writing simple test code with print statements, and observing the way in which changing the order of join calls influences the order in which tasks are processed. Could someone please tell me if my guess is correct?

2) What is the ordering for tasks that are submitted externally?

According to the answer to this question, fork-join pools do not use external queues. (I'm using Java 8, by the way.)

So am I to understand that when a task is submitted externally, the task is added to a randomly-selected worker queue?

If so, is the externally-submitted task added to the back or the front of the queue?

Finally, does this depend on whether the task is submitted by calling pool.execute(task) or by calling pool.invoke(task)? And does this depend on whether the thread calling pool.execute(task) or pool.invoke(task) is an external thread or a thread within this fork-join pool?

like image 790
Kenny Wong Avatar asked Sep 20 '18 09:09

Kenny Wong


1 Answers

  1. Your guess is correct, you are totally right. As you can read in the "Implementation overview".
 * Joining Tasks
 * =============
 *
 * Any of several actions may be taken when one worker is waiting
 * to join a task stolen (or always held) by another.  Because we
 * are multiplexing many tasks on to a pool of workers, we can't
 * just let them block (as in Thread.join).  We also cannot just
 * reassign the joiner's run-time stack with another and replace
 * it later, which would be a form of "continuation", that even if
 * possible is not necessarily a good idea since we may need both
 * an unblocked task and its continuation to progress.  Instead we
 * combine two tactics:
 *
 *   Helping: Arranging for the joiner to execute some task that it
 *      would be running if the steal had not occurred.
 *
 *   Compensating: Unless there are already enough live threads,
 *      method tryCompensate() may create or re-activate a spare
 *      thread to compensate for blocked joiners until they unblock.

2.Both ForkJoinPool.invoke and ForkJoinPool.join are the exactly the same in the manner the task is submitted. You can see in the code

    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
    }
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }

In the externalPush you can see that the task is added to a randomly-selected worker queue using ThreadLocalRandom. Moreover, it entered to the head of the queue using a push method.

    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                    int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }

I am not sure what do you mean by that:

And does this depend on whether the thread calling pool.execute(task) or pool.invoke(task) is an external thread or a thread within this fork-join pool?

like image 183
Gal S Avatar answered Nov 10 '22 19:11

Gal S