Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Phaser Vs CyclicBarrier in the context of Fork/Join

While trying to understand the differences between Phaser and CyclicBarrier I have come across some links Difference between Phaser and CyclicBarrier and https://www.infoq.com/news/2008/07/phasers/ I read that the Phaser is compatible with Fork/Join interface while CyclicBarrier is not, here is a code to demonstrate this:

Phaser

 public static void main(String[] args) throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        Phaser phaser = new Phaser(16){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                return phase ==1 || super.onAdvance(phase, registeredParties);
            }
        };

        System.out.println("Available Processors: "+Runtime.getRuntime().availableProcessors());

        ExecutorService executorService = ForkJoinPool.commonPool(); // Runtime.getRuntime().availableProcessors() -1

        for (int i = 0; i < 16; i++) {
            final int count = 0;
            executorService.submit(() -> {
                while (!phaser.isTerminated()) {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
                        System.out.println(Thread.currentThread().getName() + count + " ... ");
                        phaser.arriveAndAwaitAdvance();
                        System.out.println(Thread.currentThread().getName() + count + " ... continues ... ");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
    }

CyclicBarrier

public static void main(String[] args) throws InterruptedException {

        AtomicInteger phases = new AtomicInteger();
        CountDownLatch  countDownLatch = new CountDownLatch(1);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(16, () -> phases.incrementAndGet());

        ExecutorService executorService = ForkJoinPool.commonPool();

        for (int i = 0; i < 16; i++) {
            executorService.submit(() -> {
                while (phases.get() < 1) {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(300, 2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        System.out.println(Thread.currentThread().getName() + " Ok, I am waiting ");

                        cyclicBarrier.await();

                        System.out.println(Thread.currentThread().getName() + " continued it's way ... ");
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
    }

Explanation:

The two codes runs a fork/join thread pool, this mean that the threads are daemon threads and this is why I use CountDownLatch. The method commonPool() will create a thread pool with threads equal to Runtime.getRuntime().availableProcessors(), mine are 12, so it will create 12 threads. Both Phaser and CyclicBarrier in the two examples define 16 parties i.e they need 16 calls to await(), in the cyclic barrier, and arriveAndAwaitAdvance() in the Phaser, to move on.

In the example with the phaser, when the 12th thread blocks the fork/join will spawn more threads, it will create more threads, hence the phaser will eventually terminate. However, with CyclicBarrier when the 12th threads reach await() the program stops and never advances, it hangs. Obviously, because the barrier needs 16 calls, to make the threads advances, and only 12 are made by the created threads. The thread pool will not create more thread to advance the CyclicBarrier as it does with the Phaser.

The question:

How does the fork/join manages to create more threads with the Phaser but not with the CyclicBarrier? Why the methods arriveAndAwaitAdvance() made the thread pool create new threads, and how, but metho await() did not cause the threadpool to create more threads?

like image 888
Adelin Avatar asked Jul 28 '19 11:07

Adelin


People also ask

What is the difference between a CountDownLatch and a CyclicBarrier?

As stated in the definitions, CyclicBarrier allows a number of threads to wait on each other, whereas CountDownLatch allows one or more threads to wait for a number of tasks to complete. In short, CyclicBarrier maintains a count of threads whereas CountDownLatch maintains a count of tasks.

What is phaser java?

public class Phaser extends Object. A reusable synchronization barrier, similar in functionality to CyclicBarrier and CountDownLatch but supporting more flexible usage. Registration. Unlike the case for other barriers, the number of parties registered to synchronize on a phaser may vary over time.

What is CyclicBarrier Java?

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.

How to create a phaser instance from the main thread?

When creating Phaser instance from the main thread, we're passing 1 as an argument. This is equivalent to calling the register () method from the current thread. We're doing this because, when we're creating three worker threads, the main thread is a coordinator, and therefore the Phaser needs to have four threads registered to it:

What is phaser in Java?

In this article, we will be looking at the Phaser construct from the java.util.concurrent package. It is a very similar construct to the CountDownLatch that allows us to coordinate execution of threads.

How can a thread participate in phaser coordination?

To participate in the coordination, the thread needs to register () itself with the Phaser instance. Note that this only increases the number of registered parties, and we can't check whether the current thread is registered – we'd have to subclass the implementation to supports this.

What is cyclicbarrier in Java?

In other words, a CyclicBarrier is used when multiple thread carry out different sub tasks and the output of these sub tasks need to be combined to form the final output. After completing its execution, threads call await () method and wait for other threads to reach the barrier.


1 Answers

The Phaser is able to do this because it internally calls ForkJoinPool.managedBlock(ManagedBlocker) when blocking a thread.

This API of the ForkJoinPool is accessible to anyone, so you can easily enhance your CyclicBarrier version to use it, and remove thread starvation. For example with something in the vibe of:

ForkJoinPool.managedBlock(new ManagedBlocker() {

    boolean isReleasable = false;

    @Override
    public boolean block() throws InterruptedException {
        try {
            cyclicBarrier.await();
        } catch (BrokenBarrierException aE) {
            throw new IllegalStateException(aE);
        }
        return isReleasable = true;
    }

    @Override
    public boolean isReleasable() {
        return isReleasable;
    }
});
like image 182
Alexandre de Champeaux Avatar answered Oct 16 '22 13:10

Alexandre de Champeaux