Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ closes connection when processing long running tasks and timeout settings produce errors

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

From researching I understand that either a heartbeat or an increased connection timeout can be used to solve this. Both these solutions raise errors when attempting them. In reading answers to similar posts I've also learned that many changes have been implemented to RabbitMQ since the answers were posted (e.g. the default heartbeat timeout has changed to 60 from 580 prior to RabbitMQ 3.5.5).

When specifying a heartbeat and blocked connection timeout:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()

The following error is displayed:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'

When specifying heartbeat_interval=1000 in the connection parameters a similar error is shown: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

And similarly for socket_timeout = 1000 the following error is displayed: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

I am running RabbitMQ 3.6.1, pika 0.10.0 and python 2.7 on Ubuntu 14.04.

  1. Why are the above approaches producing errors?
  2. Can a heartbeat approach be used where there is a long running continuous task? For example can heartbeats be used when performing large database joins which take 30+ mins? I am in favour of the heartbeat approach as many times it is difficult to judge how long a task such as database join will take.

I've read through answers to similar questions

Update: running code from the pika documentation produces the same error.

like image 709
Greg Avatar asked Mar 21 '16 04:03

Greg


People also ask

Why is RabbitMQ connection closed?

A common cause for connections being abruptly closed as soon as they're started is a TCP load balancer's heartbeat. If this is the case you should see these messages at very regular intervals, and the generally accepted practice seems to be to ignore them.

How do I increase my connection timeout in RabbitMQ?

The default connection timeout for the RabbitMQ connection factory is 600 seconds (at least in the Java client API), hence your 10 minutes. You can change this by specifying to the connection factory your timeout of choice.

How do I test RabbitMQ connectivity?

Verify Server Configuration Here are the recommended steps: Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status.

What happens if RabbitMQ down?

In detail: The messages that RabbitMQ publish are persistent. When the message broker goes down, the events are not published and I am planning to store these events in a database and publish them again to RabbitMQ when it is up.


1 Answers

I've run into the same problem with my systems, that you are seeing, with dropped connection during very long tasks.

It's possible the heartbeat might help keep your connection alive, if your network setup is such that idle TCP/IP connections are forcefully dropped. If that's not the case, though, changing the heartbeat won't help.

Changing the connection timeout won't help at all. This setting is only used when initially creating the connection.

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

there are two reasons for this, both of which you have run into already:

  1. Connections drop randomly, even under the best of circumstances
  2. Re-starting a process because of a re-queued message can cause problems

Having deployed RabbitMQ code with tasks that range from less than a second, out to several hours in time, I found that acknowledging the message immediately and updating the system with status messages works best for very long tasks, like this.

You will need to have a system of record (probably with a database) that keeps track of the status of a given job.

When the consumer picks up a message and starts the process, it should acknowledge the message right away and send a "started" status message to the system of record.

As the process completes, send another message to say it's done.

This won't solve the dropped connection problem, but nothing will 100% solve that anyways. Instead, it will prevent the message re-queueing problem from happening when a connection is dropped.

This solution does introduce another problem, though: when the long running process crashes, how do you resume the work?

The basic answer is to use the system of record (your database) status for the job to tell you that you need to pick up that work again. When the app starts, check the database to see if there is work that is unfinished. If there is, resume or restart that work in whatever manner is appropriate.

like image 75
Derick Bailey Avatar answered Sep 19 '22 08:09

Derick Bailey