I have written a module that dynamically adds periodic celery tasks based on a list of dictionaries in the projects settings (imported via django.conf.settings
).
I do that using a function add_tasks
that schedules a function to be called with a specific uuid
which is given in the settings:
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
Like suggested here I use the on_after_configure.connect
signal to call the function in my celery.py
:
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
This setup works fine for both celery beat
and celery worker
but breaks my setup where I use uwsgi
to serve my django application. Uwsgi
runs smoothly until the first time when the view code sends a task using celery's .delay()
method. At that point it seems like celery is initialized in uwsgi
but blocks forever in the above code. If I run this manually from the commandline and then interrupt when it blocks, I get the following (shortened) stack trace:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'data'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
(SHORTENED HERE. Just contained the trace from the console through my call to this function)
File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
my_task.s(new_task['uuid']),
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
return getattr(self._get_current_object(), name)
File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
return loc(*self.__args, **self.__kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
return app.tasks[
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
self.finalize(auto=True)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
with self._finalize_mutex:
It seems like there is a problem with acquiring a mutex.
Currently I am using a workaround to detect if sys.argv[0]
contains uwsgi
and then not add the periodic tasks, as only beat
needs the tasks, but I would like to understand what is going wrong here to solve the problem more permanently.
Could this problem have something to do with using uwsgi multi-threaded or multi-processed where one thread/process holds the mutex the other needs?
I'd appreciate any hints that can help me solve the problem. Thank you.
I am using: Django 1.11.7 and Celery 4.1.0
Edit 1
I have created a minimal setup for this problem:
celery.py:
import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
60,
my_task.s(),
name='Testtask'
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py:
from celery import shared_task
@shared_task()
def my_task():
print('ran')
Make sure that CELERY_TASK_ALWAYS_EAGER=False and that you have a working message queue.
Run:
./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'
Wait about 10 seconds before interrupting to see the above error.
So, I have found out that the @shared_task
decorator creates the problem. I can circumvent the problem when I declare the task right in the function called by the signal like so:
def add_tasks(celery):
@celery.task
def my_task(uuid):
print(uuid)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
This solution is actually working for me, but I have one more problem with this: I use this code in a pluggable app, so I can't directly access the celery app outside of the signal handler but would like to also be able to call the my_task
function from within other code. By defining it within the function it is not available outside of the function, so I cannot import it anywhere else.
I can probably work around this by defining the task function outside of the signal function, and use it with different decorators here and in the tasks.py
. I am wondering though if there is a decorator apart from the @shared_task
decorator that I can use in the tasks.py
that does not create the problem.
The current best solution could be:
task_app.__init__.py:
def my_task(uuid):
# do stuff
print(uuid)
def add_tasks(celery):
celery_my_task = celery.task(my_task)
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
celery_my_task(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
task_app.tasks.py:
from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)
myapp.celery.py:
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
from task_app import add_tasks
add_tasks(sender)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With