Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reconnect to RabbitMQ?

My python script constantly has to send messages to RabbitMQ once it receives one from another data source. The frequency in which the python script sends them can vary, say, 1 minute - 30 minutes.

Here's how I establish a connection to RabbitMQ:

  rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host"))
  channel = rbt_conn.channel()

I just got an exception

pika.exceptions.ConnectionClosed

How can I reconnect to it? What's the best way? Is there any "strategy"? Is there an ability to send pings to keep a connection alive or set timeout?

Any pointers will be appreciated.

like image 424
Alan Coromano Avatar asked Feb 04 '16 05:02

Alan Coromano


People also ask

How do I reconnect my RabbitMQ consumer?

Consumers connected to the failed node will have to recover as usual. Consumers that were connected to a different node will be automatically re-registered by RabbitMQ when a new leader replica for the queue is elected. Those consumers do not need to perform recovery (e.g. reconnect or resubscribe).

How do I connect to RabbitMQ server?

In order for a client to interact with RabbitMQ it must first open a connection. This process involves a number of steps: Application configures the client library it uses to use a certain connection endpoint (e.g. hostname and port) The library resolves the hostname to one or more IP addresses.

Can't connect to RabbitMQ?

Make sure the node is running using rabbitmq-diagnostics status. Verify config file is correctly placed and has correct syntax/structure. Inspect listeners using rabbitmq-diagnostics listeners or the listeners section in rabbitmq-diagnostics status. Inspect effective configuration using rabbitmq-diagnostics environment.

How do I connect to my RabbitMQ remote server?

Create new RabbitMQ user and set permissions To create a new RabbitMQ user to access the RabbitMQ server remotely: Open a browser and navigate to http://localhost:15672/. The RabbitMQ Management login screen displays. Log into RabbitMQ using guest as both the username and password.


1 Answers

RabbitMQ uses heartbeats to detect and close "dead" connections and to prevent network devices (firewalls etc.) from terminating "idle" connections. From version 3.5.5 on, the default timeout is set to 60 seconds (previously it was ~10 minutes). From the docs:

Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable.

The problem with Pika's BlockingConnection is that it is unable to respond to heartbeats until some API call is made (for example, channel.basic_publish(), connection.sleep(), etc).

The approaches I found so far:

Increase or deactivate the timeout

RabbitMQ negotiates the timeout with the client when establishing the connection. In theory, it should be possible to override the server default value with a bigger one using the heartbeat_interval argument, but the current Pika version (0.10.0) uses the min value between those offered by the server and the client. This issue is fixed on current master.

On the other hand, is possible to deactivate the heartbeat functionality completely by setting the heartbeat_interval argument to 0, which may well drive you into new issues (firewalls dropping connections, etc)

Reconnecting

Expanding on @itsafire's answer, you can write your own publisher class, letting you reconnect when required. An example naive implementation:

import logging
import json
import pika

class Publisher:
    EXCHANGE='my_exchange'
    TYPE='topic'
    ROUTING_KEY = 'some_routing_key'

    def __init__(self, host, virtual_host, username, password):
        self._params = pika.connection.ConnectionParameters(
            host=host,
            virtual_host=virtual_host,
            credentials=pika.credentials.PlainCredentials(username, password))
        self._conn = None
        self._channel = None

    def connect(self):
        if not self._conn or self._conn.is_closed:
            self._conn = pika.BlockingConnection(self._params)
            self._channel = self._conn.channel()
            self._channel.exchange_declare(exchange=self.EXCHANGE,
                                           type=self.TYPE)

    def _publish(self, msg):
        self._channel.basic_publish(exchange=self.EXCHANGE,
                                    routing_key=self.ROUTING_KEY,
                                    body=json.dumps(msg).encode())
        logging.debug('message sent: %s', msg)

    def publish(self, msg):
        """Publish msg, reconnecting if necessary."""

        try:
            self._publish(msg)
        except pika.exceptions.ConnectionClosed:
            logging.debug('reconnecting to queue')
            self.connect()
            self._publish(msg)

    def close(self):
        if self._conn and self._conn.is_open:
            logging.debug('closing queue connection')
            self._conn.close()

Other possibilities

Other possibilities which I yet didn't explore:

  • Using an asynchronous adapter for publishing
  • Keeping your RabbitMQ connection and your "publish" code on a background thread, which calls periodically connection.sleep() to responde to server heartbeats.
like image 130
el.atomo Avatar answered Sep 21 '22 01:09

el.atomo