Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Controlling Task execution order with ExecutorService

I have a process which delegates asynch tasks to a pool of threads. I need to ensure that certain tasks are executed in order. So for example

Tasks arrive in order

Tasks a1, b1, c1, d1 , e1, a2, a3, b2, f1

Tasks can be executed in any order except where there is a natural dependancy, so a1,a2,a3 must be processed in that order by either allocating to the same thread or blocking these until I know the previous a# task was completed.

Currently it doesn't use the Java Concurrency package, but I'm considering changing to take avantage of the thread management.

Does anyone have a similar solution or suggestions of how to achieve this

like image 242
Wiretap Avatar asked Jan 28 '10 10:01

Wiretap


People also ask

Which are used to delegate tasks for execution to an ExecutorService?

There are a few different ways to delegate tasks for execution to an ExecutorService : execute(Runnable) submit(Runnable) submit(Callable)

What is ExecutorService and how its works?

ExecutorService is a JDK API that simplifies running tasks in asynchronous mode. Generally speaking, ExecutorService automatically provides a pool of threads and an API for assigning tasks to it.

What is the difference between executor and ExecutorService?

Executor just executes stuff you give it. ExecutorService adds startup, shutdown, and the ability to wait for and look at the status of jobs you've submitted for execution on top of Executor (which it extends). This is a perfect answer, short and clear.


2 Answers

I write own Executor that warrants task ordering for tasks with same key. It uses map of queues for order tasks with same key. Each keyed task execute next task with the same key.

This solution don't handle RejectedExecutionException or other exceptions from delegated Executor! So delegated Executor should be "unlimited".

import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; import java.util.concurrent.Executor;  /** * This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). */ public class OrderingExecutor implements Executor{      private final Executor delegate;     private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();      public OrderingExecutor(Executor delegate){         this.delegate = delegate;     }      @Override     public void execute(Runnable task) {         // task without key can be executed immediately         delegate.execute(task);     }      public void execute(Runnable task, Object key) {         if (key == null){ // if key is null, execute without ordering             execute(task);             return;         }          boolean first;         Runnable wrappedTask;         synchronized (keyedTasks){             Queue<Runnable> dependencyQueue = keyedTasks.get(key);             first = (dependencyQueue == null);             if (dependencyQueue == null){                 dependencyQueue = new LinkedList<Runnable>();                 keyedTasks.put(key, dependencyQueue);             }              wrappedTask = wrap(task, dependencyQueue, key);             if (!first)                 dependencyQueue.add(wrappedTask);         }          // execute method can block, call it outside synchronize block         if (first)             delegate.execute(wrappedTask);      }      private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {         return new OrderedTask(task, dependencyQueue, key);     }      class OrderedTask implements Runnable{          private final Queue<Runnable> dependencyQueue;         private final Runnable task;         private final Object key;          public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {             this.task = task;             this.dependencyQueue = dependencyQueue;             this.key = key;         }          @Override         public void run() {             try{                 task.run();             } finally {                 Runnable nextTask = null;                 synchronized (keyedTasks){                     if (dependencyQueue.isEmpty()){                         keyedTasks.remove(key);                     }else{                         nextTask = dependencyQueue.poll();                     }                 }                 if (nextTask!=null)                     delegate.execute(nextTask);             }         }     } } 
like image 78
Karry Avatar answered Sep 23 '22 18:09

Karry


When I've done this in the past I've usually had the ordering handled by a component which then submits callables/runnables to an Executor.

Something like.

  • Got a list of tasks to run, some with dependencies
  • Create an Executor and wrap with an ExecutorCompletionService
  • Search all tasks, any with no dependencies, schedule them via the completion service
  • Poll the completion service
  • As each task completes
    • Add it to a "completed" list
    • Reevaluate any waiting tasks wrt to the "completed list" to see if they are "dependency complete". If so schedule them
    • Rinse repeat until all tasks are submitted/completed

The completion service is a nice way of being able to get the tasks as they complete rather than trying to poll a bunch of Futures. However you will probably want to keep a Map<Future, TaskIdentifier> which is populated when a task is schedule via the completion service so that when the completion service gives you a completed Future you can figure out which TaskIdentifier it is.

If you ever find yourself in a state where tasks are still waiting to run, but nothing is running and nothing can be scheduled then your have a circular dependency problem.

like image 21
Mike Q Avatar answered Sep 20 '22 18:09

Mike Q