Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to put tasks back in the executor queue

I have a series of tasks (i.e. Runnables) to be executed by an Executor.
Each task requires a certain condition to be valid in order to proceed. I would be interested to know if there is a way to somehow configure Executor to move tasks in the end of the queue and try to execute them later when the condition would be valid and the task be able to execute and finish.
So the behavior be something like:

  1. Thread-1 take tasks from queue and run is called
  2. Inside run the condition is not yet valid
  3. Task stops and Thread-1 places task in the end of the queue and gets next task to execute
  4. Later on Thread-X (from thread pool) picks task again from queue condition is valid and task is being executed
like image 549
Cratylus Avatar asked Nov 20 '12 20:11

Cratylus


3 Answers

In Java 6, the ThreadPoolExecutor constructor takes a BlockingQueue<Runnable>, which is used to store the queued tasks. You can implement such a blocking queue which overrides the poll() so that if an attempt is made to remove and execute a "ready" job, then poll proceeds as normal. Otherwise the runnable is place at the back of the queue and you attempt to poll again, possibly after a short timeout.

like image 70
mbatchkarov Avatar answered Oct 12 '22 14:10

mbatchkarov


Unless you have to have busy waiting, you can add a repeating task to a ScheduledExecutorService with an appropriate polling interval which you cancel or kill after it is "valid" to run.

ScheduleExecutorService ses = ...

ses.scheduleAtFixedRate(new Runnable() {
    public void run() {
        if (!isValid()) return;
        preformTask();
        throw new RuntimeException("Last run");
    }
}, PERIOD, PERIOD, TimeUnit.MILLI_SECONDS);
like image 29
Peter Lawrey Avatar answered Oct 12 '22 14:10

Peter Lawrey


Create the executor first.

You have several possibilites.

If I suppose that your tasks implement a simple interface to query their status (something like an enum with 'NeedReschedule' or 'Completed'), then implement a wrapper (implementing Runnable) for your tasks which will take the task and the executor as instanciation parameters. This wrapper will run the task it is bound to, check its status afterwards, and if necessary reschedule a copy of itself in the executor before terminating.

Alternatively, you could use an execption mechanism to signal the wrapper that the task must be rescheduled. This solution is simpler, in the sense that it doesn't require a particular interface for you task, so that simple Runnable could be thrown in the system without trouble. However, exceptions incur more computation time (object construction, stack trace etc.).

Here's a possible implementation of the wrapper using the exception signaling mechanism. You need to implement the RescheduleException class extending Throwable, which may be fired by the wrapped runnable (no need for a more specific interface for the task in this setup). You could also use a simple RuntimeException as proposed in another answer, but you will have to test the message string to know if this is the exception you are waiting for.

 public class TaskWrapper implements Runnable {

    private final ExecutorService executor;
    private final Runnable task;       

    public TaskWrapper(ExecutorService e, Runnable t){
         executor = e;
         task = t;
    }

@Override
public void run() {

    try {
               task.run();
    } 
    catch (RescheduleException e) {
        executor.execute(this);
    }
}

Here's a very simple application firing up 200 wrapped tasks randomly asking a reschedule.

class Task implements Runnable {

  @Override
  public void run(){
     if (Maths.random() > 0.5)
      throw new RescheduleException();
   }     
}


public class Main {

public static void main(String[] args){

    ExecutorService executor = Executors.newFixedThreadPool(10);

    int i = 200;
            while(i--)
       executor.execute(new TaskWrapper(executor, new Task());
}
}

You could also have a dedicated thread to monitor the other threads results (using a message queue) and reschedule if necessary, but you lose one thread, compared to the other solution.

like image 31
didierc Avatar answered Oct 12 '22 16:10

didierc