Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Allow RabbitMQ and Pika maintain the conection always open

I've a Python script which reads stuff from a stream, and when a new string gets readed, it pushes its content (a string) to a RabbitMQ queue.

The thing is that the stream might not send messages in 1, 2 or 9h or so, so I would like to have the RabbitMQ connection always open.

The problem is that when I create the conection and the channel:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
channel = self.connection.channel()
channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')

... and if after an hour a message arrives, I get this error:

  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/var/opt/rabbitmq-agent.py", line 34, in push_to_queue
    raise Exception("Error sending the message to the queue: " + format(e))
Exception: Error sending the message to the queue: Send message to publisher error: Channel allocation requires an open connection: <SelectConnection CLOSED socket=None params=<ConnectionParameters host=x port=xvirtual_host=/ ssl=False>>

Which I suppose is that the connection has been closed between the rabbitmq server and client.

How can I avoid this? I would like to have a "please, keep the connection alive always". Maybe setting a super-big heartbeat in the connection parameters of Pika? Something like this:

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials, heartbeat=6000))

Any other cooler solutions would be highly appreciated.

Thanks in advance

like image 695
Avión Avatar asked Sep 15 '25 10:09

Avión


1 Answers

I would suggest you check connection every time before sending message and if the connection is closed then simply reconnect.

if not self.connection or self.connection.is_closed:
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=self.credentials))
    channel = self.connection.channel()
    channel.exchange_declare(exchange=self.exchange_name, exchange_type='fanout')
like image 183
Toni Sredanović Avatar answered Sep 18 '25 09:09

Toni Sredanović