I have 2 custom tasks (TaskA and TaskB), both inherit from celery.Task. Scheduler launches TaskA every now and then, and TaskA launches N times TaskB with different arguments every time. But for some reason, sometimes same TaskB, with same arguments, is being executed twice at the same time, and that causes different issues with the database.
class TaskA(celery.Task):
def run(self, *args, **kwargs):
objects = MyModel.objects.filter(processed=False)\
.values_list('id', flat=True)
task_b = TaskB()
for o in objects:
o.apply_async(args=[o, ])
class TaskB(celery.Task):
def run(self, obj_id, *args, **kwargs):
obj = MyModel.objects.get(id=obj_id)
# do some stuff with obj
Things I've tried
I tried using celery.group in hopes that it will fix such issues, but all I got were errors, saying that run takes 2 arguments and none were provided.
This is how I tried to launch TaskB using celery.group:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()
I also tried it like this:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()
which executed the tasks right there, before g.apply_async().
Question
Does the issue come from how I launch tasks or is it something else? Is it a normal behaviour?
Additional Info
On my local machine I run celery 3.1.13 with RabbitMQ 3.3.4, and on server celery 3.1.13 runs with Redis 2.8.9.
On local machine I see no such behaviour, every task is executed once. On server I see anywhere between 1 - 10 such tasks being executed twice in a row.
This is how I run celery on local machine and on server:
celery_beat: celery -A proj beat -l info
celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50
celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
Workaround that works
I introduced a lock on TaskB based on what arguments it recieved. After about 10 hours of testing, I see what exactly is being executed twice, but the lock prevents collision on database.
This does solve my issue, but I would still like to understand why is it happening.
Have you set the fanout_prefix and fanout_patterns as described in the Using Redis documentation for Celery? I am using Celery with Redis and I am not experiencing this problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With