Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pika + RabbitMQ: setting basic_qos to prefetch=1 still appears to consume all messages in the queue

Tags:

rabbitmq

pika

qos

I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.

Why isn't this working?

like image 237
growse Avatar asked Sep 14 '12 14:09

growse


People also ask

How do you change the prefetch count in RabbitMQ?

RabbitMQ allows you to set either a channel or consumer count using this method. The basic_qos function contains the global flag. Setting the value to false applies the count to each new consumer. Setting the value to true applies a channel prefetch count to all consumers.

How do you consume messages from RabbitMQ?

In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.

What are queued messages in RabbitMQ?

Queues in RabbitMQ are ordered collections of messages. Messages are enqueued and dequeued (delivered to consumers) in the FIFO manner.

How does the prefetch limit work in RabbitMQ?

RabbitMQ interprets this as meaning that the two prefetch limits should be enforced independently of each other; consumers will only receive new messages when neither limit on unacknowledged messages has been reached.

What is pika RabbitMQ work queue?

As with other Python tutorials, we will use the Pika RabbitMQ client version 1.0.0. In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

How to prevent RabbitMQ from sending more than one message at once?

In order to defeat that we can use the Channel#basic_qos channel method with the prefetch_count=1 setting. This uses the basic.qos protocol method to tell RabbitMQ not to give more than one message to a worker at a time.

How do I set the Count of a channel in RabbitMQ?

RabbitMQ allows you to set either a channel or consumer count using this method. The basic_qos function contains the global flag. Setting the value to false applies the count to each new consumer. Setting the value to true applies a channel prefetch count to all consumers. Most APIs set the global flag to false by default.


1 Answers

I appear to have solved this by moving where basic_qos is called.

Placing it just after channel = connection.channel() appears to alter the behaviour to what I'd expect.

like image 171
growse Avatar answered Sep 17 '22 15:09

growse