I have 2 custom tasks (TaskA
and TaskB
), both inherit from celery.Task
. Scheduler launches TaskA
every now and then, and TaskA
launches N
times TaskB
with different arguments every time. But for some reason, sometimes same TaskB
, with same arguments, is being executed twice at the same time, and that causes different issues with the database.
class TaskA(celery.Task):
def run(self, *args, **kwargs):
objects = MyModel.objects.filter(processed=False)\
.values_list('id', flat=True)
task_b = TaskB()
for o in objects:
o.apply_async(args=[o, ])
class TaskB(celery.Task):
def run(self, obj_id, *args, **kwargs):
obj = MyModel.objects.get(id=obj_id)
# do some stuff with obj
Things I've tried
I tried using celery.group
in hopes that it will fix such issues, but all I got were errors, saying that run
takes 2 arguments and none were provided.
This is how I tried to launch TaskB
using celery.group
:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()
I also tried it like this:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()
which executed the tasks right there, before g.apply_async()
.
Question
Does the issue come from how I launch tasks or is it something else? Is it a normal behaviour?
Additional Info
On my local machine I run celery 3.1.13
with RabbitMQ 3.3.4
, and on server celery 3.1.13
runs with Redis 2.8.9
.
On local machine I see no such behaviour, every task is executed once. On server I see anywhere between 1 - 10 such tasks being executed twice in a row.
This is how I run celery on local machine and on server:
celery_beat: celery -A proj beat -l info
celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50
celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
Workaround that works
I introduced a lock on TaskB
based on what arguments it recieved. After about 10 hours of testing, I see what exactly is being executed twice, but the lock prevents collision on database.
This does solve my issue, but I would still like to understand why is it happening.
Have you set the fanout_prefix
and fanout_patterns
as described in the Using Redis documentation for Celery? I am using Celery with Redis and I am not experiencing this problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With