Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java, divide incoming work uniformly via hashing in multithreaded evnironments

I've implemented a java code to execute incoming tasks (as Runnable) with n Threads based on their hashCode module nThreads. The work should spread, ideally - uniformly, among those threads. Specifically, we have a dispatchId as a string for each Task.

Here is this java code snippet:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

Important: In most cases a dispatchId is:

String dispatchId = 'SomePrefix' + counter.next()

But, I have a concern that modulo division by nThreads is not a good choice, because nThreads should be a prime number for a more uniform distribution of dispatId keys.

Are there any other options on how to spread the work better?

Update 1:

Each Worker has a queue: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

The worker gets tasks from it and executes them. Tasks can be added to this queue from other threads.

Update 2:

Tasks with the same dispatchId can come in multiple times, therefore we need to find their thread by dispatchId.

Most importantly, each Worker thread must process its incoming tasks sequentially. Hence, there is data structure Queue in the update 1 above.

Update 3: Also, some threads can be busy, while others are free. Thus, we need to somehow decouple Queues from Threads, but maintain the FIFO order for the same dispatchId for tasks execution.

Solution: I've implemented Ben Manes' idea (his answer below), the code can be found here.

like image 620
Ivan Voroshilin Avatar asked Apr 27 '15 07:04

Ivan Voroshilin


People also ask

What happens if we use HashMap in a multithreaded environment?

All of the updates to the HashMap are completed before the threads are instantiated and the thread that creates the map also forks the threads. The threads are only using the HashMap in read-only mode – either get() or iteration without remove. There are no threads updating the map.

Why HashMap should not be used for multi-threaded environments?

The HashMap is non-thread-safe and can not be used in a Concurrent multi-threaded environment. Comparatively, ConcurrentHashMap is a thread-safe and specially designed for use in multi-threaded and Concurrent environment.

What are the ways to increase throughput of a multi-threaded Java program?

By running instances or programs concurrently we ensure high throughput and higher performance as we can utilize the untapped resources like operating system hardware etc. For example, if a system has several CPUs, then the application can utilize these CPUs effectively and increase the throughput.

What is multithreading in how many ways Java implements multithreading explain one of these ways with suitable example?

Multithreading in Java is a process of executing two or more threads simultaneously to maximum utilization of CPU. Multithreaded applications execute two or more threads run concurrently. Hence, it is also known as Concurrency in Java. Each thread runs parallel to each other.


1 Answers

It sounds like you need FIFO ordering per dispatch id, so the ideal would be to have dispatch queues as the abstraction. That would explain your concern about hashing as not providing uniform distribution, as some dispatch queues may be more active than others and unfairly balanced among workers. By separating the queue from the worker, you retain FIFO semantics and evenly spread out the work.

An inactive library that provides this abstraction is HawtDispatch. It is Java 6 compatible.

A very simple Java 8 approach is to use CompletableFuture as a queuing mechanism, ConcurrentHashMap for registration, and an Executor (e.g. ForkJoinPool) for computing. See EventDispatcher for an implementation of this idea, where registration is explicit. If your dispatchers are more dynamic then you may need to periodically prune the map. The basic idea is as follows.

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

Update (JDK7)

A backport of the above idea would be translated with Guava into something like,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}
like image 160
Ben Manes Avatar answered Sep 28 '22 15:09

Ben Manes