Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flexible CountDownLatch?

I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService and then needs to wait until all N items have been processed.

Caveats

  • N is not known in advance. If it were I would simply create a CountDownLatch and then have producer thread await() until all work was complete.
  • Using a CompletionService is inappropriate because although my producer thread needs to block (i.e. by calling take()) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.

My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0 whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.

Is there a better approach to this problem or is there a suitable construct in java.util.concurrent I should be using rather than "rolling my own"?

Thanks in advance.

like image 540
Adamski Avatar asked Oct 28 '09 09:10

Adamski


People also ask

What is the difference between a CountDownLatch and a CyclicBarrier?

As stated in the definitions, CyclicBarrier allows a number of threads to wait on each other, whereas CountDownLatch allows one or more threads to wait for a number of tasks to complete. In short, CyclicBarrier maintains a count of threads whereas CountDownLatch maintains a count of tasks.

What is CountDownLatch used for?

CountDownLatch is used to make sure that a task waits for other threads before it starts. To understand its application, let us consider a server where the main task can only start when all the required services have started.

What is CountDownLatch in multithreading?

CountDownLatch class is a synchronization aid which allows one or more thread to wait until the mandatory operations are performed by other threads. CountDownLatch is initialized with a given count of threads which are required to be completed before the main thread. CountDownLatch.

Can we reuse CountDownLatch?

CountDownLatch cannot be reused, when count arrives at zero it can't be reset. CyclicBarrier can be reused after holding threads are released.


2 Answers

java.util.concurrent.Phaser looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.

The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.

A quick example on how it would work:

final Phaser phaser = new Phaser();  public Runnable getRunnable(){     return new Runnable(){         public void run(){             ..do stuff...             phaser.arriveAndDeregister();         }     }; } public void doWork(){     phaser.register();//register self     for(int i=0 ; i < N; i++){         phaser.register(); // register this task prior to execution          executor.submit( getRunnable());     }     phaser.arriveAndAwaitAdvance(); } 
like image 127
John Vint Avatar answered Sep 22 '22 23:09

John Vint


You could of course use a CountDownLatch protected by an AtomicReference so that your tasks get wrapped thus:

public class MyTask extends Runnable {     private final Runnable r;     public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }      public void run() {         r.run();         while (l.get() == null) Thread.sleep(1000L); //handle Interrupted         l.get().countDown();     } } 

Notice that the tasks run their work and then spin until the count-down is set (i.e. the total number of tasks is know). As soon as the count-down is set, they count it down and exit. These get submitted as follows:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); executor.submit(new MyTask(r, l)); 

After the point of creation/submission of your work, when you know how many tasks you have created:

latch.set(new CountDownLatch(nTasks)); latch.get().await(); 
like image 23
oxbow_lakes Avatar answered Sep 19 '22 23:09

oxbow_lakes