Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ disconnect me after some time

I'm trying to constantly listen for the queue, but after about a minute (assuming my queue is empty ) I got disconnected with this error:

DEBUG:pika.adapters.blocking_connection:Outbound buffer size: 0
DEBUG:pika.adapters.blocking_connection:Outbound buffer size: 0
ERROR:pika.adapters.base_connection:Read empty data, calling disconnect
DEBUG:pika.adapters.blocking_connection:Handling disconnect
INFO:pika.adapters.blocking_connection:on_connection_closed: None, True
WARNING:pika.adapters.blocking_connection:Received Channel.Close, closing: None
DEBUG:pika.callback:Clearing out '1' from the stack
Traceback (most recent call last):
  File "controller.py", line 59, in <module>
    c.run()
  File "controller.py", line 55, in run
    self.listen_queue() # Blocking function
  File "controller.py", line 25, in listen_queue
    self.channel.start_consuming() # Start consuming
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 814, in start_consuming
    self.connection.process_data_events()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 168, in process_data_events
    if self._handle_read():
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 272, in _handle_read
    super(BlockingConnection, self)._handle_read()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 315, in _handle_read
    return self._handle_disconnect()
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 263, in _handle_disconnect
    self._on_connection_closed(None, True)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 321, in _on_connection_closed
    self._channels[channel]._on_close(method_frame)
  File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 914, in _on_close
    raise exceptions.ChannelClosed(0, 'Not specified')
pika.exceptions.ChannelClosed: (0, 'Not specified')

And this is my code:

class RabbitConnector():

    def __init__(self):
        self._connect()

    def _connect(self):
        logger.info('Trying to connect to RabbitMQ')
        while True:
            try:
                conn_broker = pika.BlockingConnection(
                    pika.ConnectionParameters(
                        host=conf.rabbit_server,
                        port=conf.rabbit_port,
                        virtual_host=conf.rabbit_vhost,
                        ssl=conf.rabbit_ssl, # do not set it to True if there is no ssl!
                        heartbeat_interval=conf.rabbit_heartbeat_interval,
                        credentials=pika.PlainCredentials(
                            conf.rabbit_user,
                            conf.rabbit_pass)))
                logger.info('Successfully connected to Rabbit at %s:%s' % (conf.rabbit_server, conf.rabbit_port)) 
                self.channel = conn_broker.channel()
                # Don't dispatch a new message to a worker until it has processed and acknowledged the previous one
                self.channel.basic_qos(prefetch_count=conf.rabbit_prefetch_count)
                status = self.channel.queue_declare(queue=conf.rabbit_queue_name,
                                                    durable=conf.rabbit_queue_durable,
                                                    exclusive=conf.rabbit_queue_exclusive,
                                                    passive=conf.rabbit_queue_passive)
                if status.method.message_count == 0:
                    logger.info("Queue empty")
                else:
                    logger.info('Queue status: %s' % status)                  
                self.channel.queue_bind(
                    queue=conf.rabbit_queue_name,
                    exchange=conf.rabbit_exchange_name,
                    routing_key=conf.rabbit_exchange_routing_key)  
            except (pika.exceptions.AMQPConnectionError, pika.exceptions.AMQPChannelError), e:
                time.sleep(3)
                logger.error('Exception while connecting to Rabbit %s' %e)
            else:
                break

    def get_channel(self):
        return self.channel
like image 966
Vor Avatar asked Nov 03 '22 18:11

Vor


2 Answers

You can also change the heartbeat_interval, to a smaller number, e.g. 25 (seconds).
Based on https://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2013-March/025824.html

like image 70
Arik Avatar answered Nov 09 '22 16:11

Arik


I was experiencing the same issue when queue was left empty for some time. Connection was lost. It was an issue with firewall. Check your firewall rules for connection IP

like image 32
Putna Avatar answered Nov 09 '22 14:11

Putna