I've implemented a java code to execute incoming tasks (as Runnable
) with n Threads based on their hashCode module nThreads
. The work should spread, ideally - uniformly, among those threads.
Specifically, we have a dispatchId
as a string for each Task.
Here is this java code snippet:
int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}
Important: In most cases a dispatchId is:
String dispatchId = 'SomePrefix' + counter.next()
But, I have a concern that modulo division by nThreads is not a good choice, because nThreads should be a prime number for a more uniform distribution of dispatId keys.
Are there any other options on how to spread the work better?
Update 1:
Each Worker has a queue:
Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();
The worker gets tasks from it and executes them. Tasks can be added to this queue from other threads.
Update 2:
Tasks with the same dispatchId
can come in multiple times, therefore we need to find their thread by dispatchId
.
Most importantly, each Worker thread must process its incoming tasks sequentially. Hence, there is data structure Queue in the update 1 above.
Update 3:
Also, some threads can be busy, while others are free. Thus, we need to somehow decouple Queues from Threads, but maintain the FIFO order for the same dispatchId
for tasks execution.
Solution: I've implemented Ben Manes' idea (his answer below), the code can be found here.
All of the updates to the HashMap are completed before the threads are instantiated and the thread that creates the map also forks the threads. The threads are only using the HashMap in read-only mode – either get() or iteration without remove. There are no threads updating the map.
The HashMap is non-thread-safe and can not be used in a Concurrent multi-threaded environment. Comparatively, ConcurrentHashMap is a thread-safe and specially designed for use in multi-threaded and Concurrent environment.
By running instances or programs concurrently we ensure high throughput and higher performance as we can utilize the untapped resources like operating system hardware etc. For example, if a system has several CPUs, then the application can utilize these CPUs effectively and increase the throughput.
Multithreading in Java is a process of executing two or more threads simultaneously to maximum utilization of CPU. Multithreaded applications execute two or more threads run concurrently. Hence, it is also known as Concurrency in Java. Each thread runs parallel to each other.
It sounds like you need FIFO ordering per dispatch id, so the ideal would be to have dispatch queues as the abstraction. That would explain your concern about hashing as not providing uniform distribution, as some dispatch queues may be more active than others and unfairly balanced among workers. By separating the queue from the worker, you retain FIFO semantics and evenly spread out the work.
An inactive library that provides this abstraction is HawtDispatch. It is Java 6 compatible.
A very simple Java 8 approach is to use CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher for an implementation of this idea, where registration is explicit. If your dispatchers are more dynamic then you may need to periodically prune the map. The basic idea is as follows.
ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...
public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
return dispatchQueues.compute(queueName, (k, queue) -> {
return (queue == null)
? CompletableFuture.runAsync(task)
: queue.thenRunAsync(task);
});
}
Update (JDK7)
A backport of the above idea would be translated with Guava into something like,
ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...
public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
Lock lock = locks.get(queueName);
lock.lock();
try {
ListenableFuture<?> future = dispatchQueues.get(queueName);
if (future == null) {
future = executor.submit(task);
} else {
final SettableFuture<Void> next = SettableFuture.create();
future.addListener(new Runnable() {
try {
task.run();
} finally {
next.set(null);
}
}, executor);
future = next;
}
dispatchQueues.put(queueName, future);
} finally {
lock.unlock();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With