Are there any implementations of a thread pool (in Java) that ensures all tasks for the same logical ID are executed on the same thread?
The logic I'm after is if there is already a task being executed on a specific thread for a given logical ID, then new tasks with the same ID are scheduled on the same thread. If there are no threads executing a task for the same ID then any thread can be used.
This would allow tasks for unrelated IDs to be executed in parallel, but tasks for the same ID to be executed in serial and in the order submitted.
If not, are there any suggestions on how I might extend ThreadPoolExecutor
to get this behaviour (if that's even possible)?
UPDATE
Having spent longer thinking about this, I don't actually require that tasks for the same logical ID get executed on the same thread, just that they don't get executed at the same time.
An example for this would be a system that processed orders for customers, where it was OK to process multiple orders at the same time, but not for the same customer (and all orders for the same customer had to be processed in order).
The approach I'm taking at the moment is to use a standard ThreadPoolExecutor, with a customised BlockingQueue
and also wrapping the Runnable
with a custom wrapper. The Runnable
wrapper logic is:
ConcurrentHashMap
) to see if a task for the same ID is currently running
The queue's poll()
methods then only return tasks that have an ID that is not currently in the 'running' set.
The trouble with this is that I'm sure there are going to be a lot of corner cases that I haven't thought about, so it's going to require a lot of testing.
Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.
Bounded thread pools allow the programmer to specify an upper limit on the number of threads that can concurrently execute in a thread pool. Programs must not use threads from a bounded thread pool to execute tasks that depend on the completion of other tasks in the pool.
Java Thread Pool. Java Thread pool represents a group of worker threads that are waiting for the job and reused many times. In the case of a thread pool, a group of fixed-size threads is created. A thread from the thread pool is pulled out and assigned a job by the service provider.
As opposed to the cached thread pool, this one is using an unbounded queue with a fixed number of never-expiring threads. Therefore, instead of an ever-increasing number of threads, the fixed thread pool tries to execute incoming tasks with a fixed amount of threads.
Create an array of executor services running one thread each and assign your queue entries to them by the hash code of your item id. The array can be of any size, depending on how many threads at most do you want to use.
This will restrict that we can use from the executor service but still allows to use its capability to shut down the only thread when no longer needed (with allowCoreThreadTimeOut(true)
) and restart it as required. Also, all queuing stuff will work without rewriting it.
The simplest idea could be this:
Have a fixed map of BlockingQueue
s. Use hash mechanism to pick a queue based on task id. The hash algorithm should pick the same queue for the same ids. Start one single thread for every queue. every thread will pick one task from it's own dedicated queue and execute it.
p.s. the appropriate solution is strongly depends on the type of work you assign to threads
UPDATE
Ok, how about this crazy idea, please bear with me :)
Say, we have a ConcurrentHashMap
which holds references id -> OrderQueue
ID1->Q1, ID2->Q2, ID3->Q3, ...
Meaning that now every id
is associated with it's own queue. OrderQueue
is a custom blocking-queue with an additional boolean flag - isAssociatedWithWorkingThread
.
There is also a regular BlockingQueue
which we will call amortizationQueue
for now, you'll see it's use later.
Next, we have N
working threads. Every working thread has it's own working queue which is a BlockingQueue
containing ids associated with this thread.
When a new id comes, we do the following:
create a new OrderQueue(isAssociatedWithWorkingThread=false)
put the task to the queue
put id->OrderQueue to the map
put this OrderQueue to amortizationQueue
When an update for existing id comes we do the following:
pick OrderQueue from the map
put the task to the queue
if isAssociatedWithWorkingThread == false
put this OrderQueue to amortizationQueue
Every working thread does the following:
take next id from the working queue
take the OrderQueue associated with this id from the map
take all tasks from this queue
execute them
mark isAssociatedWithWorkingThread=false for this OrderQueue
put this OrderQueue to amortizationQueue
Pretty straightforward. Now to the fun part - work stealing :)
If at some point of time some working thread finds itself with empty working queue, then it does the following:
go to the pool of all working threads
pick one (say, one with the longest working queue)
steal id from *the tail* of that thread's working queue
put this id to it's own working queue
continue with regular execution
And there also +1 additional thread which provides amortization work:
while (true)
take next OrderQueue from amortizationQueue
if queue is not empty and isAssociatedWithWorkingThread == false
set isAssociatedWithWorkingThread=true
pick any working thread and add the id to it's working queue
Will have to spend more time thinking if you can get away with AtomicBoolean
for isAssociatedWithWorkingThread
flag or there is a need to make it blocking operation to check/change this flag.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With