I have 3 RabbitMQ nodes in cluster in HA mode. Each node is on separate Docker container.
I am using Celery version 4 and kombu version 4.
I have used this command to set HA policy:
rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Celery config looks like this:
CELERY = dict(
broker_url=[
'amqp://guest@rabbitmq1:5672',
'amqp://guest@rabbitmq2:5672',
'amqp://guest@rabbitmq3:5672',
],
celery_queue_ha_policy='all',
...
)
Everything works fine until I stop master RabbitMQ application in order to test Celery failover feature using command:
rabbitmqctl stop_app
Immediately after RabbitMQ application is stopped I started seeing errors in log bellow. Frequency of log messages is very high and it doesn't slow down with number of attempts.
According to logs Celery tries to reconnect using next failover, but it gets interrupted by another try to reconnect to master node that was stopped. The same thing happens over and over like in infinite loop.
[2017-03-17 15:10:28,084: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.
[2017-03-17 15:10:28,300: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:28,302: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:28,303: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:28,303: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:28,303: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:28,318: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:28,470: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
self.sync(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
replies = self.send_hello(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
timeout=self.timeout, reply=True,
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
limit, callback, channel=channel,
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
channel = self.channel
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
channel = self._channel = channel()
File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
value = self.__value__ = self.__contract__()
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
self.connection
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
self._connection = self._establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
conn.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
self.transport.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
self.sock.connect(sa)
File "/usr/local/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:28,508: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:28,570: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:28,572: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:28,575: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:28,648: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:28,655: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:28,672: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:28,673: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:28,947: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.
[2017-03-17 15:10:29,345: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,506: INFO/MainProcess] Connected to amqp://guest:**@rabbitmq2:5672//
[2017-03-17 15:10:29,535: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,569: DEBUG/MainProcess] | Consumer: Starting Events
[2017-03-17 15:10:29,682: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.
[2017-03-17 15:10:29,740: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2016 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': 'rabbit@rabbitmq1', 'platform': 'Erlang/OTP', 'version': '3.6.6'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
[2017-03-17 15:10:29,768: DEBUG/MainProcess] ^-- substep ok
[2017-03-17 15:10:29,770: DEBUG/MainProcess] | Consumer: Starting Mingle
[2017-03-17 15:10:29,770: INFO/MainProcess] mingle: searching for neighbors
[2017-03-17 15:10:29,771: DEBUG/MainProcess] using channel_id: 1
[2017-03-17 15:10:29,795: DEBUG/MainProcess] Channel open
[2017-03-17 15:10:29,874: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 38, in start
self.sync(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 42, in sync
replies = self.send_hello(c)
File "/usr/local/lib/python2.7/site-packages/celery/worker/consumer/mingle.py", line 55, in send_hello
replies = inspect.hello(c.hostname, our_revoked._data) or {}
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 129, in hello
return self._request('hello', from_node=from_node, revoked=revoked)
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 81, in _request
timeout=self.timeout, reply=True,
File "/usr/local/lib/python2.7/site-packages/celery/app/control.py", line 436, in broadcast
limit, callback, channel=channel,
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 315, in _broadcast
serializer=serializer)
File "/usr/local/lib/python2.7/site-packages/kombu/pidbox.py", line 290, in _publish
serializer=serializer,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 181, in publish
exchange_name, declare,
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 187, in _publish
channel = self.channel
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 209, in _get_channel
channel = self._channel = channel()
File "/usr/local/lib/python2.7/site-packages/kombu/utils/functional.py", line 38, in __call__
value = self.__value__ = self.__contract__()
File "/usr/local/lib/python2.7/site-packages/kombu/messaging.py", line 224, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 819, in default_channel
self.connection
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 802, in connection
self._connection = self._establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/connection.py", line 757, in _establish_connection
conn = self.transport.establish_connection()
File "/usr/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 130, in establish_connection
conn.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/connection.py", line 294, in connect
self.transport.connect()
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 120, in connect
self._connect(self.host, self.port, self.connect_timeout)
File "/usr/local/lib/python2.7/site-packages/amqp/transport.py", line 161, in _connect
self.sock.connect(sa)
File "/usr/local/lib/python2.7/socket.py", line 228, in meth
return getattr(self._sock,name)(*args)
error: [Errno 111] Connection refused
[2017-03-17 15:10:29,887: DEBUG/MainProcess] Closed channel #1
[2017-03-17 15:10:29,907: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Gossip...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2017-03-17 15:10:29,908: DEBUG/MainProcess] | Consumer: Restarting Control...
[2017-03-17 15:10:29,909: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2017-03-17 15:10:29,910: DEBUG/MainProcess] Canceling task consumer...
[2017-03-17 15:10:29,911: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2017-03-17 15:10:29,912: DEBUG/MainProcess] | Consumer: Restarting Events...
[2017-03-17 15:10:29,953: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2017-03-17 15:10:29,954: DEBUG/MainProcess] | Consumer: Starting Connection
[2017-03-17 15:10:30,036: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.
Unfortunately, Celery documentation doesn't say much about failover topic.
Its definitely bug, I have created issue on GitHub: https://github.com/celery/celery/issues/3921
Thanks to George Psarakis I have managed to avoid bug using --without-mingle
flag for Celery workers, eg:
celery worker -A app.tasks -l debug --without-mingle
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With