Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ExecutorService that executes tasks sequentially but takes threads from a pool

I am trying to build an implementation of the ExecutorService, let's call it SequentialPooledExecutor, with the following properties.

  1. All instances of SequentialPooledExecutor share the same thread pool

  2. Calls on the same instance of SequentialPooledExecutor are executed sequentially.

In other words, the instance waits for the termination of the currently executing task before start processing the next task in its queue.

I am currently in the process of implementing the SequentialPooledExecutor myself, but I am wondering if I am reinventing the wheel. I looked into different implementations of ExecutorService, for example those that are provided by the Executors class, but I did not find one that meets my requirements.

Do you know whether there are existing implementations that I am missing, or should I continue with implementing interface myself?

EDIT:

I think my requirement are not very clear, let's see if I can explain it with other words.

Suppose I have a series of sessions, say 1000 of them (the things that I was calling instance of the executor before). I can submit tasks to a session and I want the guarantee that all tasks that are submitted to the same session are executed sequentially. However, tasks that belong to different sessions should have no dependency from each other.

I want to define an ExecutorService that executes these tasks, but uses a bounded number of threads, let's say 200, but ensures that a task is not started before the previous one in the same session is finished.

I don't know if there is anything existing that already does that, or if I should implement such an ExecutorService myself.

like image 506
ichfarbstift Avatar asked Oct 07 '16 08:10

ichfarbstift


2 Answers

If you have thousands of keys which must be processed sequentially, but you don't have thousands of cores you can use a hashing strategy to distribute the work like this

ExecutorService[] es = // many single threaded executors

public <T> Future<T> submit(String key, Callable<T> calls) {
    int h = Math.abs(key.hashCode() % es.length);
    return es[h].submit(calls);
}

In general you only need 2 * N threads to keep N cores busy, if your task is CPU bound, more than that just adds overhead.

like image 82
Peter Lawrey Avatar answered Oct 21 '22 14:10

Peter Lawrey


private Map<Integer, CompletableFuture<Void>> sessionTasks = new HashMap<>();
private ExecutorService pool = Executors.newFixedThreadPool(200);

public void submit(int sessionId, Runnable task) {  
    if (sessionTasks.containsKey(sessionId)) {
        sessionTasks.compute(sessionId, (i, c) -> c.thenRunAsync(task, pool));
    } else {
        sessionTasks.put(sessionId, CompletableFuture.runAsync(task, pool));
    }
}

If a session has no task, a new task is created and run in the provided pool. If a session already has a tasks when a new task is added, the latter is chained (with thenRun) to the previous one, ensuring order.

like image 43
Spotted Avatar answered Oct 21 '22 15:10

Spotted