Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java producer/consumer, detecting end of processing

I'm preparing an application where a single producer generates several million tasks, which will then be processed by a configurable number of consumers. Communication from producer to consumer is (probably) going to be queue-based.

From the thread that runs the producer/generates the tasks, what method can I use to wait for completion of all tasks? I'd rather not resume to any periodic polling to see if my tasks queue is empty. In any case, the task queue being empty isn't actually a guarantee that the last tasks have completed. Those tasks can be relatively long-running, so it's quite possible that the queue is empty while the consumer threads are still happily processing.

Rgds, Maarten

like image 545
Maarten Boekhold Avatar asked Jun 30 '26 09:06

Maarten Boekhold


1 Answers

You might want to have a look at the java.util.concurrent package.

  • ExecutorService
  • Executors
  • Future

The executor framework already provides means to execute tasks via threadpool. The Future abstraction allows to wait for the completition of tasks.

Putting both together allows you coordinate the executions easily, decoupling tasks, activities (threads) and results.

Example:

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    //dispatch 
    List<Future<Void>> results =  executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    for(Future<Void> result: results){
        result.get();
    }

Edit: Alternative Version using CountDownLatch

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    final CountDownLatch latch;

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    latch = new CountDownLatch(tasks.size());

    //dispatch 
    executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    latch.await();

And inside your tasks:

    Callable<Void> task = new Callable<Void>()
    {

        @Override
        public Void call() throws Exception
        {
            // TODO: do your stuff

            latch.countDown(); //<---- important part
            return null;
        }
    };
like image 51
b_erb Avatar answered Jul 02 '26 23:07

b_erb



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!