Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I multithread a queue consumer when the order of some transactions matters?

I have a queue of tasks that operate on a collection of objects (let's say the objects are entries in an address book, for the sake of example).

An example task might be "Update Joe's phone number to 888-555-1212".

It's possible to have multiple "Update Joe's phone number..." tasks in the queue simultaneously, but with different phone numbers. In this case, the updates must be applied in order to ensure the state is correct at the end (and no, for the sake of argument, it is not possible to put timestamps on the tasks and timestamps on the address book entries and throw away stale tasks).

It is safe to apply an update for Jane out-of-order with an update for Joe.

I would like to multithread processing of the queue, but I need to synchronize access by person.

Is there a handy library for this kind of thing? Or am I relegated to using an Executor and doing my own synchronization on "name" in the Runnable's run() method?

like image 338
Jared Avatar asked Aug 20 '13 18:08

Jared


People also ask

Can Kafka consumer be multi-threaded?

Multi-threaded Kafka consumerThere are many ways to design multi-threaded models for a Kafka consumer. A naive approach might be to process each message in a separate thread taken from a thread pool, while using automatic offset commits (default config).

What will happen if two thread of the same priority are called to be processed simultaneously?

Explanation: In cases where two or more thread with same priority are competing for CPU cycles, different operating system handle this situation differently. Some execute them in time sliced manner some depending on the thread they call. 5.

Can you make multiple thread to execute same instructions?

In the same multithreaded process in a shared-memory multiprocessor environment, each thread in the process can run concurrently on a separate processor, resulting in parallel execution, which is true simultaneous execution.

What is a queue multithreading?

The Queue module is primarily used to manage to process large amounts of data on multiple threads. It supports the creation of a new queue object that can take a distinct number of items.


1 Answers

A straightforward, but not quite perfect, solution to this problem is to maintain a set of sub queues in an array equal to the number of processing threads you are running. A single master thread pulls items off of your single main queue and adds them to the sub queue indexed via the modulo of the object key's hashCode (the hashCode of whatever identifies and relates your tasks).

E.g.

int queueIndex = myEntity.getKey().hashCode() % queues.length;

Only one thread processes that queue, and all tasks for the same entity will be submitted to that queue, so there will be no race conditions.

This solution is imperfect since some threads may end up with larger queues than others. Practically, this is unlikely to matter but it is something to consider.

Issues with simple solution:

The simpler solution of pulling items off of a single queue and then locking on something distinct for the affected entity has a race condition (as Aurand pointed out). Given:

Master Queue [ Task1(entity1), Task2(entity1), ... ]

Where task1 and task2 both edit the same entity entity1, and there is thread1 and thread2 operating on the queue, then the expected / desired sequence of events is:

  • Thread1 takes task1
  • Thread1 locks on entity1
  • Thread1 edits entity1
  • Thread1 unlocks entity1
  • Thread2 takes task2
  • Thread2 locks entity1
  • Thread2 edits entity1
  • Thread2 unlocks entity1

Unfortunately, even if the lock is the first statement of the thread's run method, it is possible for the following sequence to occur:

  • Thread1 takes task1
  • Thread2 takes task2
  • Thread2 locks entity1
  • Thread2 edits entity1
  • Thread2 unlocks entity1
  • Thread1 locks entity1
  • Thread1 edits entity1
  • Thread1 unlocks entity1

To avoid this, each thread would would have to lock on something (say the queue) before taking a task from the queue, and then acquire a lock on the entity while still holding the parent lock. However, you do not want to block everything while holding this parent lock and waiting to acquire the entity lock, so you need to only try for the entity lock and then handle the case when it fails to acquire (put it into another queue perhaps). Overall the situation becomes non-trivial.

like image 130
Trevor Freeman Avatar answered Oct 28 '22 19:10

Trevor Freeman