I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.
From researching I understand that either a heartbeat or an increased connection timeout can be used to solve this. Both these solutions raise errors when attempting them. In reading answers to similar posts I've also learned that many changes have been implemented to RabbitMQ since the answers were posted (e.g. the default heartbeat timeout has changed to 60 from 580 prior to RabbitMQ 3.5.5).
When specifying a heartbeat and blocked connection timeout:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
The following error is displayed:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
When specifying heartbeat_interval=1000
in the connection parameters a similar error is shown: TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
And similarly for socket_timeout = 1000
the following error is displayed: TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
I am running RabbitMQ 3.6.1, pika 0.10.0 and python 2.7 on Ubuntu 14.04.
I've read through answers to similar questions
Update: running code from the pika documentation produces the same error.
A common cause for connections being abruptly closed as soon as they're started is a TCP load balancer's heartbeat. If this is the case you should see these messages at very regular intervals, and the generally accepted practice seems to be to ignore them.
The default connection timeout for the RabbitMQ connection factory is 600 seconds (at least in the Java client API), hence your 10 minutes. You can change this by specifying to the connection factory your timeout of choice.
Verify Server Configuration Here are the recommended steps: Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status.
In detail: The messages that RabbitMQ publish are persistent. When the message broker goes down, the events are not published and I am planning to store these events in a database and publish them again to RabbitMQ when it is up.
I've run into the same problem with my systems, that you are seeing, with dropped connection during very long tasks.
It's possible the heartbeat might help keep your connection alive, if your network setup is such that idle TCP/IP connections are forcefully dropped. If that's not the case, though, changing the heartbeat won't help.
Changing the connection timeout won't help at all. This setting is only used when initially creating the connection.
I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.
there are two reasons for this, both of which you have run into already:
Having deployed RabbitMQ code with tasks that range from less than a second, out to several hours in time, I found that acknowledging the message immediately and updating the system with status messages works best for very long tasks, like this.
You will need to have a system of record (probably with a database) that keeps track of the status of a given job.
When the consumer picks up a message and starts the process, it should acknowledge the message right away and send a "started" status message to the system of record.
As the process completes, send another message to say it's done.
This won't solve the dropped connection problem, but nothing will 100% solve that anyways. Instead, it will prevent the message re-queueing problem from happening when a connection is dropped.
This solution does introduce another problem, though: when the long running process crashes, how do you resume the work?
The basic answer is to use the system of record (your database) status for the job to tell you that you need to pick up that work again. When the app starts, check the database to see if there is work that is unfinished. If there is, resume or restart that work in whatever manner is appropriate.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With