Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement an ExecutorService to execute batches of tasks

I am looking for a way to execute batches of tasks in java. The idea is to have an ExecutorService based on a thread pool that will allow me to spread a set of Callable among different threads from a main thread. This class should provide a waitForCompletion method that will put the main thread to sleep until all tasks are executed. Then the main thread should be awaken, and it will perform some operations and resubmit a set of tasks.

This process will be repeated numerous times, so I would like to use ExecutorService.shutdown as this would require to create multiple instances of ExecutorService.

Currently I have implemented it in the following way using a AtomicInteger, and a Lock/Condition:

public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
  private final AtomicInteger mActiveCount;
  private final Lock          mLock;
  private final Condition     mCondition;

  public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
    ...
    for(C task : batch){
      submit(task);
      mActiveCount.incrementAndGet();
    }
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    mLock.lock();
    if (mActiveCount.decrementAndGet() == 0) {
      mCondition.signalAll();
    }
    mLock.unlock();
  }

  public void awaitBatchCompletion() throws InterruptedException {
    ...
    // Lock and wait until there is no active task
    mLock.lock();
    while (mActiveCount.get() > 0) {
      try {
        mCondition.await();
      } catch (InterruptedException e) {
        mLock.unlock();
        throw e;
      }
    }
    mLock.unlock();
  } 
}

Please not that I will not necessarily submit all the tasks from the batch at once, therefore CountDownLatch does not seem to be an option.

Is this a valid way to do it? Is there a more efficient/elegant way to implement that?

Thanks

like image 294
Victor P. Avatar asked Apr 24 '12 12:04

Victor P.


People also ask

What method do you call to run a task on a worker thread using an ExecutorService?

We use the Executors. newSingleThreadExecutor() method to create an ExecutorService that uses a single worker thread for executing tasks. If a task is submitted for execution and the thread is currently busy executing another task, then the new task will wait in a queue until the thread is free to execute it.

What is a future How is it used in ExecutorService?

An example of using Future is working with Thread pools. When one submit a task to ExecutorService which is take a long running time, then it returns a Future object immediately. This Future object can be used for task completion and getting result of computation.

How does ExecutorService work in Java?

The executor service creates and maintains a reusable pool of threads for executing submitted tasks. The service also manages a queue, which is used when there are more tasks than the number of threads in the pool and there is a need to queue up tasks until there is a free thread available to execute the task.


2 Answers

I think the ExecutorService itself will be able to perform your requirements.

Call invokeAll([...]) and iterate over all of your Tasks. All Tasks are finished, if you can iterate through all Futures.

like image 173
Christian Kuetbach Avatar answered Nov 01 '22 10:11

Christian Kuetbach


As the other answers point out, there doesn't seem to be any part of your use case that requires a custom ExecutorService.

It seems to me that all you need to do is submit a batch, wait for them all to finish while ignoring interrupts on the main thread, then submit another batch perhaps based on the results of the first batch. I believe this is just a matter of:

    ExecutorService service = ...;

    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }

    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }

    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.
like image 31
sharakan Avatar answered Nov 01 '22 11:11

sharakan