I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService
and then needs to wait until all N items have been processed.
Caveats
CountDownLatch
and then have producer thread await()
until all work was complete.CompletionService
is inappropriate because although my producer thread needs to block (i.e. by calling take()
) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0
whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.
Is there a better approach to this problem or is there a suitable construct in java.util.concurrent
I should be using rather than "rolling my own"?
Thanks in advance.
As stated in the definitions, CyclicBarrier allows a number of threads to wait on each other, whereas CountDownLatch allows one or more threads to wait for a number of tasks to complete. In short, CyclicBarrier maintains a count of threads whereas CountDownLatch maintains a count of tasks.
CountDownLatch is used to make sure that a task waits for other threads before it starts. To understand its application, let us consider a server where the main task can only start when all the required services have started.
CountDownLatch class is a synchronization aid which allows one or more thread to wait until the mandatory operations are performed by other threads. CountDownLatch is initialized with a given count of threads which are required to be completed before the main thread. CountDownLatch.
CountDownLatch cannot be reused, when count arrives at zero it can't be reset. CyclicBarrier can be reused after holding threads are released.
java.util.concurrent.Phaser
looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.
The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.
A quick example on how it would work:
final Phaser phaser = new Phaser(); public Runnable getRunnable(){ return new Runnable(){ public void run(){ ..do stuff... phaser.arriveAndDeregister(); } }; } public void doWork(){ phaser.register();//register self for(int i=0 ; i < N; i++){ phaser.register(); // register this task prior to execution executor.submit( getRunnable()); } phaser.arriveAndAwaitAdvance(); }
You could of course use a CountDownLatch
protected by an AtomicReference
so that your tasks get wrapped thus:
public class MyTask extends Runnable { private final Runnable r; public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; } public void run() { r.run(); while (l.get() == null) Thread.sleep(1000L); //handle Interrupted l.get().countDown(); } }
Notice that the tasks run their work and then spin until the count-down is set (i.e. the total number of tasks is know). As soon as the count-down is set, they count it down and exit. These get submitted as follows:
AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>(); executor.submit(new MyTask(r, l));
After the point of creation/submission of your work, when you know how many tasks you have created:
latch.set(new CountDownLatch(nTasks)); latch.get().await();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With