Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Message queue architecture when messages need to access shared data

I have to build a motion detection service. The motion detection doesn't operate on videos, but instead on just still images.

This microservice will need to be able to receive images out of order (with a timestamp) and figure out if the image differs from an image taken before it (with a timestamp earlier than it). There will need to be multiple motion detection workers.

So, key requirements seem to be:

  1. A web service that takes in images out of order, is able to group them into previous and next pairs, and then compute whether an image has motion compared to its previous image.
  2. Many image producers - throughout seems to be, on average, around 100 images a second
  3. Many motion detection consumers
  4. Prioritise latency over throughput.
  5. Tasks that aren't easily independently consumable.

I was thinking of using a single message queue. The producers push image documents onto the queue. The motion detection workers then read from this queue, and add a 'diff_percentage' field to that document, and update that record in the database.

Given a task in the queue, a worker can operate on that task alone, by fetching the image before it directly from the database, and comparing it, then updating the record in the database. Unfortunately, whilst that would work well enough, it would be horribly slow. I think I need to reduce reads from the database. Ideally I'd like this "queue" to be able to buffer until it has the images needed by a given job. Something like... when a worker reads from the queue, check if the image it needs to compare against is in the queue, if not, go to the database.

Can anyone point me in the right direction? Perhaps a queue is not what I want? Perhaps a queue, and some sort of caching bucket?

like image 339
Dominic Bou-Samra Avatar asked Jul 24 '14 22:07

Dominic Bou-Samra


2 Answers

An image queue with multiple producers and multiple consumers does seem the right approach here. For the remainder of this answer I'll abstract away from the specifics of this queue because those depend on where the producers and consumers are (physically on which machine).

Here's what to do at the consumer end:

Keep the images temporarily in memory in a hash table. The key is the timestamp and the value is a pointer to the image content (as well as any metadata that you may want to keep). An image can be removed from the hash table as soon as it has been compared with the image of the successive timestamp.

Your consumer machine will need have enough working memory for the images. If on average 100 images are received between a given timestamp and the timestamp either directly before or directly after it, and an image is 1MB in size on average, the images will together take 100 * 2 * 1MB = 200MB of memory.

Create a second, in-memory queue to keep track of images that can't be compared yet. Workers put image pointers with their timestamps onto that queue if the image with the previous timestamp is not available from the hash table at the time of receiving the image with the current timestamp. A second set of workers takes timestamps from this queue and tests whether the image of the previous timestamp has become available in the meanwhile. If so, it compares the images, otherwise it pushes the image and timestamp back onto the queue.

The relative size of the first and second set of workers should be proportional to the relative frequency of an image arriving before its direct successor. In other words, if 60% of the time an image enters the hash table before its direct successor (so 40% of the time an image arrives after its direct successor), 60% of the workers should be in the first set and 40% should be in the second set. Alternatively, you may assign workers to a set dynamically depending on demand; this may be appropriate if out-of-order behaviour tends to fluctuate a lot, for example depending on the time of the day.

A third queue with a single consumer is responsible for updating the database. This third queue may or may not be across a network, just like the first queue. After a worker from the previous two sets has compared two successive images, it pushes the result onto this third queue. The consumer of this queue takes the contents of the queue and synchronizes them to the database. It may do this with one transaction for every few (say 10) comparisons, to minimize latency, or pool everything in a single transaction per second, to maximize throughput. Do not create a transaction for each image comparison individually, that will likely be much slower than you want.

The image comparison workers all need to read and update the hash table, so you need a mechanism to prevent race conditions. A locking mechanism is inappropriate in this case because it will probably become the bottleneck of your application. Instead, dedicate a single worker to managing the hash table and have all comparison workers send requests to the hash table manager over a read/insert queue. Since the manager has a relatively light job (storing, retrieving and removing image pointers) it should be able to stay ahead of the read/insert queue most of the time.

When a worker does a read request, it will wait for the reply of the manager (not when it does an insert request). It might pass a callback and sleep, or enter a spinlock checking for the "reply ready" value of a shared variable (depending on your programming environment this may boil down to the same thing under the hood). Of course you would rather not have your workers wait at all, but most waits will be very brief and this approach will certainly be faster than a global locking approach.

After an image is first successfully retrieved from the hash table, the manager can remove the image from the table (because the image will only ever be requested for comparison with the successive image). The manager should remove the pointer from the hash table, not delete the image itself. You can use reference counting to determine when an image should be completely purged from memory. While the reference counting needs to be locked or atomic, this will not be a bottleneck since at most two workers will be accessing an image at any given time, mostly without directly affecting the reference count.

Notes

In the above design I have not discussed when images enter the permanent database. Most likely this should happen on the producer end, before images enter the first queue. Alternatively, the database synchronizing agent from the third queue that I discussed may do it. You don't want to burder your comparison workers or your hash table manager with this responsibility.

If you think my answer is promising, I'm willing to provide additional documentation, for example (minimalist) flow diagrams, pseudocode algorithms for the workers or crude dataflow traffic profiles.

like image 67
Julian Avatar answered Oct 04 '22 19:10

Julian


I think your problem comes from trying to use a single queue for many image producers. This causes the images to become lumped together and then a need to untangle the image sequences.

My approach would be to stream the images into timestamped directories and files. Don't mix the image producers, keep them separate. Then it's very easy to scan through the files and mark them with a diff_percentage.

For example, Image Producer #1 stores files in directory /IP1/date/time/sequence, where date is like 20140727 (2014-07-27), time is like 1542 (3:42pm) and sequence is a counter from 1 to 6000 (up to 100 frames per second, 60 seconds per minute). Copy this structure for other Image Producers. This way, the images get stored independently of the workers and don't have a bottleneck with a queue or database.

Then, run as many consumers as you want in parallel that can wake up, process a chunk of images and go to sleep when they run out of files to process. They need to work exclusively from each other by having them work on separate directories. I would have them add files to the directories with a diff_percentage for each image, and also another file when a directory (time or date) is complete. This makes it easy for them to restart and catch up in case they stop unexpectedly.

This is the old "divide and conquer" approach. By keeping the image streams separate, it is easier to divide up the work between the consumer processes.

like image 38
Brent Washburne Avatar answered Oct 04 '22 18:10

Brent Washburne