I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count
to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.
Why isn't this working?
RabbitMQ allows you to set either a channel or consumer count using this method. The basic_qos function contains the global flag. Setting the value to false applies the count to each new consumer. Setting the value to true applies a channel prefetch count to all consumers.
In order to consume messages there has to be a queue. When a new consumer is added, assuming there are already messages ready in the queue, deliveries will start immediately. The target queue can be empty at the time of consumer registration. In that case first deliveries will happen when new messages are enqueued.
Queues in RabbitMQ are ordered collections of messages. Messages are enqueued and dequeued (delivered to consumers) in the FIFO manner.
RabbitMQ interprets this as meaning that the two prefetch limits should be enforced independently of each other; consumers will only receive new messages when neither limit on unacknowledged messages has been reached.
As with other Python tutorials, we will use the Pika RabbitMQ client version 1.0.0. In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
In order to defeat that we can use the Channel#basic_qos channel method with the prefetch_count=1 setting. This uses the basic.qos protocol method to tell RabbitMQ not to give more than one message to a worker at a time.
RabbitMQ allows you to set either a channel or consumer count using this method. The basic_qos function contains the global flag. Setting the value to false applies the count to each new consumer. Setting the value to true applies a channel prefetch count to all consumers. Most APIs set the global flag to false by default.
I appear to have solved this by moving where basic_qos
is called.
Placing it just after channel = connection.channel()
appears to alter the behaviour to what I'd expect.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With