I have a stream of requests in my RabbitMQ cluster, and multiple consumers handling them. The thing is - each consumer must handle requests in batches for performance reasons. Specifically there is a network IO operation that I can amortize by batching requests.
So, each consumer would like to maximize the number of requests that it can batch, but not add too much latency.
I could potentially start a timer when a consumer receives the first request and keep collecting requests until one of the two things happen - timer expires, or 500 requests have been received.
Is there a better way to achieve this - without blocking each consumer?
Get request to retrieve messages, it must send a new request each time it wants to receive a message, even if there are multiple messages in the queue. If the queue you're retrieving a message from has a message pending when issuing a Basic. Get , RabbitMQ responds with a Basic. GetOk RPC response (figure 5.2).
RabbitMq can send a single ack for thousands of messages. It will set the "multiple" flag on the ack. This means that all messages have been acknowledged up to and including that sequence number (delivery tag) and since the sequence number of the last ack (or 1 if this is the first ack).
RabbitMQ has a plugin for consistent hash exchange. Using that exchange, and one consumer per queue, we can achieve message order with multiple consumers. The hash exchange distributes routing keys among queues, instead of messages among queues. This means all messages with the same routing key will go the same queue.
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
in general, the network aspect of "batching messages" is handled at the level of the basic.qos(prefetch-size, prefetch-count)
parameters. In this scheme, the broker will send some number of bytes/messages(respectively) beyond the the unacknowledged messages for a consumer, but the client library doles out messages, in process, one at a time to the application.
To maximize the benefit, the appication can withhold basic.ack()
for each message, and periodically issue basic.ack(delivery-tag=n, multiple=True)
to acknowledge all messages with a delivery tag <= n.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With