Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ pika.exceptions.ConnectionClosed (-1, "error(104, 'Connection reset by peer')")

I have a task queue in RabbitMQ with multiple producers (12) and one consumer for heavy tasks in a webapp. When I run the consumer it starts dequeuing some of the messages before crashing with this error:

Traceback (most recent call last):
File "jobs.py", line 42, in <module> jobs[job](config)
File "/home/ec2-user/project/queue.py", line 100, in init_queue
channel.start_consuming()
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 1822, in start_consuming
self.connection.process_data_events(time_limit=None)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 749, in process_data_events
self._flush_output(common_terminator)
File "/usr/lib/python2.7/site-packages/pika/adapters/blocking_connection.py", line 477, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "error(104, 'Connection reset by peer')")

The producers code is:

message = {'image_url': image_url, 'image_name': image_name, 'notes': notes}

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='tasks_queue')
channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))

connection.close()

And the only consumer's code (the one is clashing):

def callback(self, ch, method, properties, body):
    """Callback when receive a message."""
    message = json.loads(body)
    try:
        image = _get_image(message['image_url'])
    except:
        sys.stderr.write('Error getting image in note %s' % note['id'])
   # Crop image with PIL. Not so expensive
   box_path = _crop(image, message['image_name'], box)

   # API call. Long time function
   result = long_api_call(box_path)

   if result is None:
       sys.stderr.write('Error in note %s' % note['id'])
       return
   # update the db
   db.update_record(result)


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='tasks_queue')
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback_obj.callback, queue='tasks_queue', no_ack=True)
channel.start_consuming()

As you can see, there are 3 expensive functions for message. One crop task, one API call and one database update. Without the API call, que consumer runs smoothly.

Thanks in advance

like image 605
user3753792 Avatar asked Oct 24 '18 15:10

user3753792


1 Answers

Your RabbitMQ log shows a message that I thought we might see:

missed heartbeats from client, timeout: 60s

What's happening is that your long_api_call blocks Pika's I/O loop. Pika is a very lightweight library and does not start threads in the background for you so you must code in such a way as to not block Pika's I/O loop longer than the heartbeat interval. RabbitMQ thinks your client has died or is unresponsive and forcibly closes the connection.

Please see my answer here which links to this example code showing how to properly execute a long-running task in a separate thread. You can still use no_ack=True, you will just skip the ack_message call.


NOTE: the RabbitMQ team monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.

like image 113
Luke Bakken Avatar answered Sep 19 '22 12:09

Luke Bakken