I want process messages in few threads but i'm getting error during execute this code:
from __future__ import with_statement import pika import sys from pika.adapters.blocking_connection import BlockingConnection from pika import connection, credentials import time import threading import random from pika.adapters.select_connection import SelectConnection from pika.connection import Connection import traceback def doWork(body, args, channel): r = random.random() time.sleep(r * 10) try: channel.basic_ack(delivery_tag=args.delivery_tag) except : traceback.print_exc() auth = credentials.PlainCredentials(username="guest", password="guest") params = connection.ConnectionParameters(host="localhost", credentials=auth) conn = BlockingConnection(params) channel = conn.channel() while True: time.sleep(0.03) try: method_frame, header_frame, body = channel.basic_get(queue="test_queue") if method_frame.NAME == 'Basic.GetEmpty': continue t = threading.Thread(target=doWork, args=[body, method_frame, channel]) t.setDaemon(True) t.start() except Exception, e: traceback.print_exc() continue
Error desctiption:
Traceback (most recent call last): File "C:\work\projects\mq\start.py", line 43, in method_frame, header_frame, body = channel.basic_get(queue="test_queue") File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 318, in basic_get self.basic_get_(self, self._on_basic_get, ticket, queue, no_ack) File "C:\work\projects\mq\libs\pika\channel.py", line 469, in basic_get no_ack=no_ack)) File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 244, in send_method self.connection.process_data_events() File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 94, in process_data_events self._handle_read() File "C:\work\projects\mq\libs\pika\adapters\base_connection.py", line 162, in _handle_read self._on_data_available(data) File "C:\work\projects\mq\libs\pika\connection.py", line 589, in _on_data_available frame) # Args File "C:\work\projects\mq\libs\pika\callback.py", line 124, in process callback(*args, **keywords) File "C:\work\projects\mq\libs\pika\adapters\blocking_connection.py", line 269, in _on_remote_close frame.method.reply_text) AMQPChannelError: (406, 'PRECONDITION_FAILED - unknown delivery tag 204')
Versions: pika 0.9.5, rabbitMQ 2.6.1
The problem probably is that you're setting no_ack=True
like this:
consumer_tag = channel.basic_consume( message_delivery_event, no_ack=True, queue=queue, )
And then acknowledging the messages:
channel.basic_ack(delivery_tag=args.delivery_tag)
You have to chose if you want to acknowledge or not and set the correct consume parameter.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With