Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to know when a CompletionService is finished delivering results?

I want to use a CompletionService to process the results from a series of threads as they are completed. I have the service in a loop to take the Future objects it provides as they become available, but I don't know the best way to determine when all the threads have completed (and thus to exit the loop):

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

    final static int MAX_THREADS = 4;
    final static int TOTAL_THREADS = 20;

    public static void main(String[] args) throws Exception{

        final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
        final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

        for (int i=0; i<TOTAL_THREADS; i++){
            service.submit(new MyCallable(i));
        }

        int finished = 0;
        Future<Integer> future = null;
        do{
            future = service.take();
            int result = future.get();
            System.out.println("  took: " + result);
            finished++;             

        }while(finished < TOTAL_THREADS);

        System.out.println("Shutting down");
        threadPool.shutdown();
    }


    public static class MyCallable implements Callable<Integer>{

        final int id;

        public MyCallable(int id){
            this.id = id;
            System.out.println("Submitting: " + id);
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println("finished: " + id);
            return id;
        }
    }
}

I've tried checking the state of the ThreadPoolExecutor, but I know the getCompletedTaskCount and getTaskCount methods are only approximations and shouldn't be relied upon. Is there a better way to ensure that I've retrieved all the Futures from the CompletionService than counting them myself?


Edit: Both the link that Nobeh provided, and this link suggest that counting the number of tasks submitted, then calling take() that many times, is the way to go. I'm just surprised there isn't a way to ask the CompletionService or its Executor what's left to be returned.

like image 656
Ed Beaty Avatar asked Apr 03 '12 04:04

Ed Beaty


2 Answers

See http://www.javaspecialists.eu/archive/Issue214.html for a decent suggestion on how to extend the ExecutorCompletionService to do what you're looking for. I've pasted the relevant code below for your convenience. The author also suggests making the service implement Iterable, which I think would be a good idea.

FWIW, I agree with you that this really should be part of the standard implementation, but alas, it's not.

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}
like image 137
Mark Avatar answered Nov 15 '22 15:11

Mark


Answering to these questions gives you the answer?

  • Do your asynchronous tasks create other tasks submitted to CompletionService?
  • Is service the only object that is supposed to handle the tasks created in your application?

Based on reference documentation, CompletionService acts upon a consumer/producer approach and takes advantage of an internal Executor. So, as long as, you produce the tasks in one place and consume them in another place, CompletionService.take() will denote if there are any more results to give out.

I believe this question also helps you.

like image 36
nobeh Avatar answered Nov 15 '22 16:11

nobeh