Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Java provide an ExecutorService which allows a worker to execute on the same thread?

I am looking for an implementation of ExecutorService which will provide the following semantics. Each thread is occupied by a 'worker' that performs some task based on an input. Each worker is guaranteed to only execute in a single thread, thus, it should be allowed to maintain state from task to task, without the overhead of synchronisation, since it would be synchronising with itself, in a single thread.

So let's say I have 100 inputs, and 10 workers, I would like to be able to write something like:

for (Input input: inputs) {
    // The following code would pass t to all 10 workers,
    // each bound to their own thread,
    // and wait for them to complete.
    executor.invokeAll(input);
}

Note that each Worker does a different thing with any given input. The input is not a runnable block of code, it's just a parameter to the worker. Each worker decides what to do with the input. Though, to make it simpler, the workers implement an interface that would allow it to be called polymorphically, receiving the input.

I have hacked together something which works, using a Map<Worker, WorkerExecutor>, where WorkerExecutor is my thin wrapper around a Executors.newSingleThreadPool, and only a single instance of Worker will run in each thread pool. I'd prefer to find something written by someone who knows what they're doing :-)


Potential Ineffeciency I'm OK With

I realise this kind of semantics will result in inefficiency, however, I'm trying to get the most bang for my buck in terms of development time, and redesigning each implementation of Worker to be thread safe is non-trivial. The inefficiency I mean is that execution could/will look something like this (simulating max 2 active threads for this example):

         | Task 1    | Task 2    | Task 3    | Task 4    |
Worker 1 | =@        | =@        | =@        | =@        |
Worker 2 | ==@       | ==@       | ==@       | ==@       |
Worker 3 |   ==@     |   ==@     |   ==@     |   ==@     |
Worker 4 |    =====@ |    =====@ |    =====@ |    =====@ |

The problem being that after Worker 3 completes, there is no tasks left to do, and no work can be done until Worker 4 completes. That could be arbitrarily long amount of time that a CPU can be left idle.


Does such an ExecutorService exist?

like image 862
Grundlefleck Avatar asked Oct 22 '22 16:10

Grundlefleck


2 Answers

It sounds like what you actually want are actors. Put simply, an actor is an object that runs in a single thread and has a "mailbox" of tasks that it's responsible for processing sequentially. Akka seems to be the current leading library/framework providing actors on the JVM. Take a look over there.

like image 87
Ryan Stewart Avatar answered Oct 27 '22 10:10

Ryan Stewart


Something along the lines of:

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

// you implement this for each of your non-parallelisable jobbies
interface Worker<T> {
    public void process(T input);
}

// implementation detail
class Clerk<T> {
    private final Executor executor = Executors.newSingleThreadExecutor();
    private final Worker<T> worker;

    public Clerk(Worker<T> worker) {
        this.worker = worker;
    }

    public void process(final T input) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                worker.process(input);
            }
        });
    }
}

// make one of these, and give it all your workers, then give it input
class Workshop<T> {
    private final Set<Clerk<T>> clerks = new LinkedHashSet<Clerk<T>>();

    public void addWorker(Worker<T> worker) {
        // mutable; you love it
        clerks.add(new Clerk<T>(worker));
    }

    public void process(T input) {
        for (Clerk<T> clerk : clerks) {
            clerk.process(input);
        }
    }

    public void processAll(Iterable<T> inputs) {
        for (T input : inputs) {
            process(input);
        }
    }
}

Perhaps?

like image 37
Tom Anderson Avatar answered Oct 27 '22 08:10

Tom Anderson