Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

specifying ThreadPoolExecutor problem

Is there any way to create Executor that will have always at least 5 threads, and maximum of 20 threads, and unbounded queue for tasks (meaning no task is rejected)

I tried new ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) with all possibilities that I thought of for queue:

new LinkedBlockingQueue() // never runs more than 5 threads
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting
new SynchronousQueue() // no tasks can wait, after 20, they are rejected

and none worked as wanted.

like image 871
Sarmun Avatar asked Sep 16 '09 22:09

Sarmun


1 Answers

Maybe something like this would work for you? I just whipped it up so please poke at it. Basically, it implements an overflow thread pool that is used to feed the underlying ThreadPoolExecutor

There are two major draw backs I see with it:

  • The lack of a returned Future object on submit(). But maybe that is not an issue for you.
  • The secondary queue will only empty into the ThreadPoolExecutor when jobs are submitted. There has got to be an elegant solution, but I don't see it just yet. If you know that there will be a stead stream of tasks into the StusMagicExecutor then this may not be an issue. ("May" being the key word.) An option might to be to have your submitted tasks poke at the StusMagicExecutor after they complete?

Stu's Magic Executor:

public class StusMagicExecutor extends ThreadPoolExecutor {
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>();  //capacity is Integer.MAX_VALUE.

    public StusMagicExecutor() {
        super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler());  
    }
    public void queueRejectedTask(Runnable task) {
        try {
            secondaryQueue.put(task);
        } catch (InterruptedException e) {
            // do something
        }
    }
    public Future submit(Runnable newTask) {
        //drain secondary queue as rejection handler populates it
        Collection<Runnable> tasks = new ArrayList<Runnable>();
        secondaryQueue.drainTo(tasks);

        tasks.add(newTask);

        for (Runnable task : tasks)
             super.submit(task);

        return null; //does not return a future!
    }
}

class RejectionHandler implements RejectedExecutionHandler {
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        ((StusMagicExecutor)executor).queueRejectedTask(runnable);
    }
}
like image 183
Stu Thompson Avatar answered Sep 25 '22 00:09

Stu Thompson