Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Ideas to avoid doing a trick on CyclicBarrier

I was running some tests with parallel processing and made a program that given a matrix of integers re-calcutes each position's value based on the neighbours.

I needed a copy of the matrix so the values wouldn't be overriden and used a CyclicBarrier to merge the results once the partial problems were solved:

CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() {
    public void run() {
        ParallelProcess.mergeResult();
    }
});
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init

Each task is assigned a portion of the matrix: I'm splitting it in equals pieces by rows. But it might happen that the divisions are not exact so there would be a small piece corresponding to the lasts row that wouldn't be submitted to the thread pool.

Example: if I have 16 rows and n_tasks = 4 no problem, all 4 will be submitted to the pool. But if I had 18 instead, the first 16 ones will be submitted, but not the last two ones.

So I'm forcing a submit if this case happens. Well, I'm not submitting actually, because I am using a fixed thread pool that I created like this ExecutorService e = Executors.newFixedThreadPool(n_tasks). Since all the slots in the pool are occupied and the threads are blocked by the barrier (mybarrier.await() is called in the run method) I couldn't submit it to the pool, so I just used Thread.start().

Let's go to the point. Since I need to take into consideration for the CyclicBarrier the possibility of that chunk remaining, the number of parties must be incremented by 1.

But if this case didn't happen, I would be one party short to trigger the barrier.

What's my solution?:

if (lower_limit != n_rows) { // the remaining chunk to be processed
    Thread t = new Thread(new ParallelProcess(lower_limit, n_rows));
    t.start();
    t.join();
}
else {
    cyclic_barrier.await();
}

I feel like I am cheating when using the cyclic_barrier.await() trick to raise the barrier by force.

Is there any other way I could approach this problem so I didn't have to do what I'm doing?

like image 258
dabadaba Avatar asked Jun 23 '14 17:06

dabadaba


People also ask

Under what circumstances can the await method of CyclicBarrier throw an exception?

If the barrier is reset() while any thread is waiting, or if the barrier is broken when await is invoked, or while any thread is waiting, then BrokenBarrierException is thrown.

When to use Cyclic barrier?

In other words, a CyclicBarrier is used when multiple thread carry out different sub tasks and the output of these sub tasks need to be combined to form the final output. After completing its execution, threads call await() method and wait for other threads to reach the barrier.

How can cyclic barrier be reused?

We can reuse CyclicBarrier by calling reset() method which resets the barrier to its initial state. CountDownLatch is good for one time event like application/module start-up time and CyclicBarrier can be used to in case of recurrent event e.g. concurrently (re-)calculating each time when the input data changed.

How do I reset my CyclicBarrier?

The barrier is called cyclic because it can be re-used after the waiting threads are released. So in normal usage, once all the threads are collected and the barrier is broken, it resets itself and can be used again. Resets the barrier to its initial state.


1 Answers

Though this doesn't answer your question about CyclicBarriers, can I recommend using a Phaser? This does have the ability to include the number of parties, and it also allows you to run the mergeResult when a phase is tripped.

So, before you execute an async calculation, simply register. Then inside that calculation have the thread arrive on the phaser. When all threads have arrived, it will advance the phase and can invoke an overriden method onAdvance.

The submission:

ParallelProcess process = new ParallelProcess(lower_limit, n_rows));
phaser.register();
executor.submit(process);

The processor

public void run(){
   //do stuff
   phaser.arrive();
}

The phaser

Phaser phaser = new Phaser(){
    protected boolean onAdvance(int phase, int registeredParties) {
        ParallelProcess.mergeResult(); 
        return true;
    }
}
like image 196
John Vint Avatar answered Oct 28 '22 22:10

John Vint