I have a queue of tasks that operate on a collection of objects (let's say the objects are entries in an address book, for the sake of example).
An example task might be "Update Joe's phone number to 888-555-1212".
It's possible to have multiple "Update Joe's phone number..." tasks in the queue simultaneously, but with different phone numbers. In this case, the updates must be applied in order to ensure the state is correct at the end (and no, for the sake of argument, it is not possible to put timestamps on the tasks and timestamps on the address book entries and throw away stale tasks).
It is safe to apply an update for Jane out-of-order with an update for Joe.
I would like to multithread processing of the queue, but I need to synchronize access by person.
Is there a handy library for this kind of thing? Or am I relegated to using an Executor and doing my own synchronization on "name" in the Runnable's run() method?
Multi-threaded Kafka consumerThere are many ways to design multi-threaded models for a Kafka consumer. A naive approach might be to process each message in a separate thread taken from a thread pool, while using automatic offset commits (default config).
Explanation: In cases where two or more thread with same priority are competing for CPU cycles, different operating system handle this situation differently. Some execute them in time sliced manner some depending on the thread they call. 5.
In the same multithreaded process in a shared-memory multiprocessor environment, each thread in the process can run concurrently on a separate processor, resulting in parallel execution, which is true simultaneous execution.
The Queue module is primarily used to manage to process large amounts of data on multiple threads. It supports the creation of a new queue object that can take a distinct number of items.
A straightforward, but not quite perfect, solution to this problem is to maintain a set of sub queues in an array equal to the number of processing threads you are running. A single master thread pulls items off of your single main queue and adds them to the sub queue indexed via the modulo of the object key's hashCode (the hashCode of whatever identifies and relates your tasks).
E.g.
int queueIndex = myEntity.getKey().hashCode() % queues.length;
Only one thread processes that queue, and all tasks for the same entity will be submitted to that queue, so there will be no race conditions.
This solution is imperfect since some threads may end up with larger queues than others. Practically, this is unlikely to matter but it is something to consider.
Issues with simple solution:
The simpler solution of pulling items off of a single queue and then locking on something distinct for the affected entity has a race condition (as Aurand pointed out). Given:
Master Queue [ Task1(entity1), Task2(entity1), ... ]
Where task1
and task2
both edit the same entity entity1
, and there is thread1
and thread2
operating on the queue, then the expected / desired sequence of events is:
Unfortunately, even if the lock is the first statement of the thread's run method, it is possible for the following sequence to occur:
To avoid this, each thread would would have to lock on something (say the queue) before taking a task from the queue, and then acquire a lock on the entity while still holding the parent lock. However, you do not want to block everything while holding this parent lock and waiting to acquire the entity lock, so you need to only try for the entity lock and then handle the case when it fails to acquire (put it into another queue perhaps). Overall the situation becomes non-trivial.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With