Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java producer-consumer with stopping condition

I have N workers that share a queue of elements to compute. At each iteration, each worker removes an element from the queue and can produce more elements to compute, that are going to be put in the same queue. Basically, each producer is also a consumer. The computation finishes when there are no elements on the queue and all the workers have finished computing the current element (hence no more elements to compute can be produced). I want to avoid a dispatcher/coordinator, so the workers should coordinate. What is the best pattern to allow a worker to find out if the halting conditions is valid, and hence halt the computation on behalf of the others?

For example, if all the threads would just do this cycle, when the elements are all computed, it would result in all the threads being blocked eternally:

while (true) {
    element = queue.poll();
    newElements[] = compute(element);
    if (newElements.length > 0) {
        queue.addAll(newElements);
    }
}
like image 919
marcorossi Avatar asked May 16 '13 16:05

marcorossi


1 Answers

Maintain a count of active threads.

public class ThreadCounter {
    public static final AtomicInteger threadCounter = new AtomicInteger(N);
    public static final AtomicInteger queueCounter = new AtomicInteger(0);
    public static final Object poisonPill = new Object();
    public static volatile boolean cancel = false; // or use a final AomticBoolean instead
}

Your threads' polling loop should look like the following (I'm assuming that you're using a BlockingQueue)

while(!ThreadCounter.cancel) {
    int threadCount = ThreadCounter.threadCounter.decrementAndGet(); // decrement before blocking
    if(threadCount == 0 && ThreadCounter.queueCounter.get() == 0) {
        ThreadCounter.cancel = true;
        queue.offer(ThreadCounter.poisonPill);
    } else {
        Object obj = queue.take();
        ThreadCounter.threadCounter.incrementAndGet(); // increment when the thread is no longer blocking
        ThreadCounter.queueCounter.decrementAndGet();
        if(obj == ThreadCounter.poisonPill) {
            queue.offer(obj); // send the poison pill back through the queue so the other threads can read it
            continue;
        }
    }
}

If a thread is about to block on the BlockingQueue then it decrements the counter; if all threads are already waiting on the queue (meaning that counter == 0), then the last thread sets cancel to true, then sends a poison pill through the queue to wake up the other threads; each thread sees the poison pill, sends it back through the queue to wake up the remaining threads, and then exits the loop when it sees that cancel is set to true.

Edit: I've removed the data race by adding a queueCounter that maintains a count of the number of objects in the queue (obviously you'll also need to add a queueCounter.incrementAndGet() call to wherever you're adding objects to the queue). This works as follows: if threadCount == 0, but queueCount != 0, then this means that a thread has just removed an item from the queue but has not yet called threadCount.getAndIncrement, and so the cancel variable is not set to true. It's important that the threadCount.getAndIncrement call precede the queueCount.getAndDecrement call, otherwise you'll still have a data race. It shouldn't matter in what order you call queueCount.getAndIncrement since you won't be interleaving this with a call to threadCount.getAndDecrement (the latter will be called at the end of the loop, the former will be called at the beginning of the loop).

Note that you can't just use a queueCount to determine when to end the process, because a thread might still be active without yet having placed any data in the queue - in other words, queueCount would be zero, but would be non-zero once the thread finished its current iteration.

Instead of repeatedly sending the poisonPill through the queue, you can instead have the canceling thread send (N-1) poisonPills through the queue. Just be cautious if you use this approach using a different queue, because some queues (e.g. Amazon's Simple Queue Service) may return multiple items on the equivalent of their take methods, in which case you'll need to repeatedly send through the poisonPill to ensure that everything shuts down.

Additionally, instead of using a while(!cancel) loop, you can use a while(true) loop and break when the loop detects a poisonPill

like image 142
Zim-Zam O'Pootertoot Avatar answered Sep 25 '22 00:09

Zim-Zam O'Pootertoot