Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery RabbitMQ broker failover connect issue

Tags:

I have 3 RabbitMQ nodes in cluster in HA mode. Each node is on separate Docker container.

I am using Celery version 4 and kombu version 4.

I have used this command to set HA policy:

rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Celery config looks like this:

CELERY = dict(
    broker_url=[
        'amqp://guest@rabbitmq1:5672',
        'amqp://guest@rabbitmq2:5672',
        'amqp://guest@rabbitmq3:5672',
    ],
   celery_queue_ha_policy='all',
   ...
)

Everything works fine until I stop master RabbitMQ application in order to test Celery failover feature using command:

rabbitmqctl stop_app

Immediately after RabbitMQ application is stopped I started seeing errors in log bellow. Frequency of log messages is very high and it doesn't slow down with number of attempts.

According to logs Celery tries to reconnect using next failover, but it gets interrupted by another try to reconnect to master node that was stopped. The same thing happens over and over like in infinite loop.

[2017-03-17 15:10:28,084: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:28,300: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:28,302: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:28,303: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:28,303: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:28,303: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:28,318: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:28,470: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
    self.sync(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
    replies = self.send_hello(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
    timeout=self.timeout, reply=True,
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
    channel = self.channel
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
    self.connection
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
    conn.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
    self.transport.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
    self.sock.connect(sa)
  File "/usr/local/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:28,508: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:28,570: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:28,572: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:28,575: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:28,648: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:28,672: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:28,673: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:28,947: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:29,345: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,506: INFO/MainProcess] Connected to amqp://guest:**@rabbitmq2:5672//
[2017-03-17 15:10:29,535: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,569: DEBUG/MainProcess] | Consumer: Starting Events
[2017-03-17 15:10:29,682: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:29,740: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,768: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,770: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:29,770: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:29,771: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:29,795: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:29,874: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
    self.sync(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
    replies = self.send_hello(c)
  File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
    timeout=self.timeout, reply=True,
  File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
    limit, callback, channel=channel,
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
    channel = self.channel
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
    channel = self._channel = channel()
  File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
    value = self.__value__ = self.__contract__()
  File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
    self.connection
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
    self._connection = self._establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
    conn.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
    self.transport.connect()
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
    self.sock.connect(sa)
  File "/usr/local/lib/python2.7/socket.py", line 228, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:29,887: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:29,907: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:29,909: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:29,910: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:29,911: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:29,912: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:29,953: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:29,954: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:30,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

Unfortunately, Celery documentation doesn't say much about failover topic.

like image 511
draskomikic Avatar asked Mar 24 '17 10:03

draskomikic


1 Answers

Its definitely bug, I have created issue on GitHub: https://github.com/celery/celery/issues/3921

Thanks to George Psarakis I have managed to avoid bug using --without-mingle flag for Celery workers, eg:

celery worker -A app.tasks -l debug --without-mingle
like image 127
draskomikic Avatar answered Sep 23 '22 09:09

draskomikic