Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dealing with duplication in a message queue

I've been arguing with my programmer about the best way of going about this. We have data that comes in at a rate of about 10000 objects per second. This needs to be processed asynchronously, but loose ordering is sufficient, so each object is inserted round-robin-ly into one of several message queues (there are also several producers and consumers). Each object is ~300 bytes. And it needs to be durable, so the MQs are configured to persist to disk.

The problem is that often these objects are duplicated (as in they are unavoidably duplicated in the data that comes in to the producer). They do have 10-byte unique ids. It's not catastrophic if objects are duplicated in the queue, but it is if they're duplicated in the processing after being taken from the queue. What's the best way to go about ensuring as close as possible to linear scalability whilst ensuring there's no duplication in the processing of the objects? And perhaps linked to that, should the whole object be stored in the message queue, or only the id with the body stored in something like cassandra?

Thank you!

Edit: Confirmed where the duplication occurs. Also, so far I've had 2 recommendations for Redis. I'd previously been considering RabbitMQ. What are the pros and cons of each with regards to my requirements?

like image 584
Max Avatar asked Jun 28 '11 01:06

Max


2 Answers

p.s: this is the first time in my life that redis website is having problems, but I bet when you visit it, they have solved the problem

> We have data that comes in at a rate
> of about 10000 objects per second.
> This needs to be processed
> asynchronously, but loose ordering is
> sufficient, so each object is inserted
> round-robin-ly into one of several
> message queues (there are also several
> producers and consumers)

My first advice would be to look at redis because it is insanely fast and I bet you can handle all your messages with only a single message queue.

First I like to show you information about my laptop(I like it, but a big server is going to be a lot faster ;)). My dad(was impressed a little bit :)) recently bought a new PC and it beats my laptop hard(8 cpu's instead 2).

-Computer-
Processor       : 2x Intel(R) Core(TM)2 Duo CPU     T7100  @ 1.80GHz
Memory      : 2051MB (1152MB used)
Operating System        : Ubuntu 10.10
User Name       : alfred (alfred)
-Display-
Resolution      : 1920x1080 pixels
OpenGL Renderer     : Unknown
X11 Vendor      : The X.Org Foundation
-Multimedia-
Audio Adapter       : HDA-Intel - HDA Intel
-Input Devices-
 Power Button
 Lid Switch
 Sleep Button
 Power Button
 AT Translated Set 2 keyboard
 Microsoft Comfort Curve Keyboard 2000
 Microsoft Comfort Curve Keyboard 2000
 Logitech Trackball
 Video Bus
 PS/2 Logitech Wheel Mouse
-SCSI Disks-
HL-DT-ST DVDRAM GSA-T20N
ATA WDC WD1600BEVS-2

Below the benchmarks using redis-benchmark on my machine without even doing much redis optimization:

alfred@alfred-laptop:~/database/redis-2.2.0-rc4/src$ ./redis-benchmark 
====== PING (inline) ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

94.84% <= 1 milliseconds
98.74% <= 2 milliseconds
99.65% <= 3 milliseconds
100.00% <= 4 milliseconds
46296.30 requests per second

====== PING ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.30% <= 1 milliseconds
98.21% <= 2 milliseconds
99.29% <= 3 milliseconds
99.52% <= 4 milliseconds
100.00% <= 4 milliseconds
45662.10 requests per second

====== MSET (10 keys) ======
  10000 requests completed in 0.32 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

3.45% <= 1 milliseconds
88.55% <= 2 milliseconds
97.86% <= 3 milliseconds
98.92% <= 4 milliseconds
99.80% <= 5 milliseconds
99.94% <= 6 milliseconds
99.95% <= 9 milliseconds
99.96% <= 10 milliseconds
100.00% <= 10 milliseconds
30864.20 requests per second

====== SET ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

92.45% <= 1 milliseconds
98.78% <= 2 milliseconds
99.00% <= 3 milliseconds
99.01% <= 4 milliseconds
99.53% <= 5 milliseconds
100.00% <= 5 milliseconds
47169.81 requests per second

====== GET ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

94.50% <= 1 milliseconds
98.21% <= 2 milliseconds
99.50% <= 3 milliseconds
100.00% <= 3 milliseconds
47619.05 requests per second

====== INCR ======
  10000 requests completed in 0.23 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.90% <= 1 milliseconds
97.45% <= 2 milliseconds
98.59% <= 3 milliseconds
99.51% <= 10 milliseconds
99.78% <= 11 milliseconds
100.00% <= 11 milliseconds
44444.45 requests per second

====== LPUSH ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

95.02% <= 1 milliseconds
98.51% <= 2 milliseconds
99.23% <= 3 milliseconds
99.51% <= 5 milliseconds
99.52% <= 6 milliseconds
100.00% <= 6 milliseconds
47619.05 requests per second

====== LPOP ======
  10000 requests completed in 0.21 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

95.89% <= 1 milliseconds
98.69% <= 2 milliseconds
98.96% <= 3 milliseconds
99.51% <= 5 milliseconds
99.98% <= 6 milliseconds
100.00% <= 6 milliseconds
47619.05 requests per second

====== SADD ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.08% <= 1 milliseconds
97.79% <= 2 milliseconds
98.61% <= 3 milliseconds
99.25% <= 4 milliseconds
99.51% <= 5 milliseconds
99.81% <= 6 milliseconds
100.00% <= 6 milliseconds
45454.55 requests per second

====== SPOP ======
  10000 requests completed in 0.22 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.88% <= 1 milliseconds
98.64% <= 2 milliseconds
99.09% <= 3 milliseconds
99.40% <= 4 milliseconds
99.48% <= 5 milliseconds
99.60% <= 6 milliseconds
99.98% <= 11 milliseconds
100.00% <= 11 milliseconds
46296.30 requests per second

====== LPUSH (again, in order to bench LRANGE) ======
  10000 requests completed in 0.23 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

91.00% <= 1 milliseconds
97.82% <= 2 milliseconds
99.01% <= 3 milliseconds
99.56% <= 4 milliseconds
99.73% <= 5 milliseconds
99.77% <= 7 milliseconds
100.00% <= 7 milliseconds
44247.79 requests per second

====== LRANGE (first 100 elements) ======
  10000 requests completed in 0.39 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

6.24% <= 1 milliseconds
75.78% <= 2 milliseconds
93.69% <= 3 milliseconds
97.29% <= 4 milliseconds
98.74% <= 5 milliseconds
99.45% <= 6 milliseconds
99.52% <= 7 milliseconds
99.93% <= 8 milliseconds
100.00% <= 8 milliseconds
25906.74 requests per second

====== LRANGE (first 300 elements) ======
  10000 requests completed in 0.78 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

1.30% <= 1 milliseconds
5.07% <= 2 milliseconds
36.42% <= 3 milliseconds
72.75% <= 4 milliseconds
93.26% <= 5 milliseconds
97.36% <= 6 milliseconds
98.72% <= 7 milliseconds
99.35% <= 8 milliseconds
100.00% <= 8 milliseconds
12886.60 requests per second

====== LRANGE (first 450 elements) ======
  10000 requests completed in 1.10 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

0.67% <= 1 milliseconds
3.64% <= 2 milliseconds
8.01% <= 3 milliseconds
23.59% <= 4 milliseconds
56.69% <= 5 milliseconds
76.34% <= 6 milliseconds
90.00% <= 7 milliseconds
96.92% <= 8 milliseconds
98.55% <= 9 milliseconds
99.06% <= 10 milliseconds
99.53% <= 11 milliseconds
100.00% <= 11 milliseconds
9066.18 requests per second

====== LRANGE (first 600 elements) ======
  10000 requests completed in 1.48 seconds
  50 parallel clients
  3 bytes payload
  keep alive: 1

0.85% <= 1 milliseconds
9.23% <= 2 milliseconds
11.03% <= 3 milliseconds
15.94% <= 4 milliseconds
27.55% <= 5 milliseconds
41.10% <= 6 milliseconds
56.23% <= 7 milliseconds
78.41% <= 8 milliseconds
87.37% <= 9 milliseconds
92.81% <= 10 milliseconds
95.10% <= 11 milliseconds
97.03% <= 12 milliseconds
98.46% <= 13 milliseconds
99.05% <= 14 milliseconds
99.37% <= 15 milliseconds
99.40% <= 17 milliseconds
99.67% <= 18 milliseconds
99.81% <= 19 milliseconds
99.97% <= 20 milliseconds
100.00% <= 20 milliseconds
6752.19 requests per second

As you can hopefully see from benchmarking my simple laptop you probably just need one message queue because can redis handle 10000 lpush requests in 0.23 seconds and 10000 lpop requests in 0.21 seconds. When you just need one queue I believe your problem is not a problem anymore(or are the producers producings duplicates which I don't understand completely?).

> And it needs to be durable, so the MQs
> are configured to persist to disk.

redis also persist to disc.

> The problem is that often these
> objects are duplicated. They do have
> 10-byte unique ids. It's not
> catastrophic if objects are duplicated
> in the queue, but it is if they're
> duplicated in the processing after
> being taken from the queue. What's the
> best way to go about ensuring as close
> as possible to linear scalability
> whilst ensuring there's no duplication
> in the processing of the objects?

When using a single message queue(box) this problem does not exist if I understand correctly. But if not you could just simply check if the id is member of your set ids. When you process the id you should remove it from the set ids. First you should offcourse add the members to the list using sadd.

If one box does not scale anymore you should shard your keys over multiple boxes and check that key on that box. to learn more about this I think you should read the following links:

  • Redis replication and redis sharding (cluster) difference
  • http://antirez.com/post/redis-presharding.html
  • http://redis.io/presentation/Redis_Cluster.pdf
  • http://blog.zawodny.com/2011/02/26/redis-sharding-at-craigslist/

    And perhaps linked to that, should the whole object be stored in the message queue, or only the id with the body stored in something like cassandra?

If possible you should all your information directly into memory because nothing can run as fast as memory(okay your cache memory is even faster but really really small plus you can't access that via your code). Redis does store all your information inside of memory and makes snapshots to disc. I think you should be able to store all your information inside memory and skip using something like Cassandra altogether.

Let's consider each object is 400 bytes per object in total at a rate of 10000 per second => 4000000 bytes for all objects per second => 4 MB/s if my calculation is correct. You could easily store that amount of information inside your memory. If you can't you should really consider upgrading your memory if possible at all, because memory isn't that expensive anymore.

like image 145
Alfred Avatar answered Nov 08 '22 13:11

Alfred


Without knowing how the messages are created within the system, the mechanism the producer uses for publishing to the queue, and knowing with queue system is in use, it's difficult to diagnose what's going on.

I've seen this scenario happen in a number of different ways; timed-out workers causing the message to become visible again in the queue (and thus processed a second time, this is common with Kestrel), misconfigured brokers (HA ActiveMQ comes to mind), misconfigured clients (Spring plus Camel routing comes to mind), clients double submitting, etc. There are just a number of ways this kind of issue can come up.

Since I can't really diagnose the issue, I'll plug redis here. You could easily combine something like SPOP (which is O(1), as is SADD) with pub/sub for an incredibly fast, constant time, duplicate free (sets must contain unique elements) queue. Although it's a ruby project, resque may be able to help. It's at least worth looking at.

Best of luck.

like image 41
bmatheny Avatar answered Nov 08 '22 13:11

bmatheny