I have invoked a task that fetches some information remotely with urllib2 a few thousand times. The tasks are scheduled with a random eta (within a week) so they all don't hit the server at the same time. Sometimes I get a 404, sometimes not. I am handling the error in case it happens.
In the RabbitMQ console I can see 16 unacknowledged messages:
I stopped celery, purged the queue and restarted it. The 16 unacknowledged messages were still there.
I have other tasks that go to the same queue and none of them was executed either. After purging, I tried to submit another task and it's state remains ready:
Any ideas how I can find out why messages remain unacknowledged?
Versions:
celery==3.1.4
{rabbit,"RabbitMQ","3.5.3"}
celeryapp.py
CELERYBEAT_SCHEDULE = {
'social_grabber': {
'task': '<django app>.tasks.task_social_grabber',
'schedule': crontab(hour=5, minute=0, day_of_week='sunday'),
},
}
tasks.py
@app.task
def task_social_grabber():
for user in users:
eta = randint(0, 60 * 60 * 24 * 7) #week in seconds
task_social_grabber_single.apply_async((user), countdown=eta)
There is no routing for this task defined so it goes into the default queue: celery. There is one worker processing this queue.
supervisord.conf:
[program:celery]
autostart = true
autorestart = true
command = celery worker -A <django app>.celeryapp:app --concurrency=3 -l INFO -n celery
RabbitMQ Unacked Messages are the messages that are not Acknowledged. If a consumer fails to acknowledge messages, the RabbitMQ will keep sending new messages until the prefetch value set for the associated channel is equal to the number of RabbitMQ Unacked Messages count.
celery-amqp-backend is a rewrite of the Celery's original amqp:// result backend, which was removed from Celery with version 5.0. Celery encourages you to use the newer rpc:// result backend, as it does not create a new result queue for each task and thus is faster in many circumstances.
From my understanding, Celery is a distributed task queue, which means the only thing that it should do is dispatching tasks/jobs to others servers and get the result back. RabbitMQ is a message queue, and nothing more. However, a worker could just listen to the MQ and execute the task when a message is received.
RabbitMQ broke QoS settings in version 3.3. You need to upgrade celery to at least 3.1.11 (changelog) and kombu to at least 3.0.15 (changelog). You should use the latest versions.
I hit this exact same behavior when 3.3 was released. RabbitMQ flipped the default behavior of the prefetch_count flag. Before this, if a consumer reached the CELERYD_PREFETCH_MULTIPLIER limit in eta'd messages, the worker would up this limit in order to fetch more messages. The change broke this behavior, as the new default behavior denied this capability.
I had a similar symptoms. Messages where getting to the MQ (visible in the charts) but where not picked up by the worker.
This led me to the assumption that my Django app had correctly setup Celery app, but I was missing an import ensuring Celery would be configured during Django startup:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app # noqa
It is a silly mistake, but the messages getting to the broker, having returned an AsyncResult, got me off track, and made me looking i the wrong places. Then I noticed that setting CELERY_ALWAYS_EAGER = True
didn't do squat, event then tasks weren't executed at all.
PS: This may not be an answer to @kev question, but since I got here couple of times, while looking for the solution to my problem, I post it here for anyone in similar situation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With