Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery add_periodic_task blocks Django running in uwsgi environment

I have written a module that dynamically adds periodic celery tasks based on a list of dictionaries in the projects settings (imported via django.conf.settings). I do that using a function add_tasks that schedules a function to be called with a specific uuid which is given in the settings:

def add_tasks(celery):
    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            my_task.s(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

Like suggested here I use the on_after_configure.connect signal to call the function in my celery.py:

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
    from add_tasks_module import add_tasks
    add_tasks(celery)

This setup works fine for both celery beat and celery worker but breaks my setup where I use uwsgi to serve my django application. Uwsgi runs smoothly until the first time when the view code sends a task using celery's .delay() method. At that point it seems like celery is initialized in uwsgi but blocks forever in the above code. If I run this manually from the commandline and then interrupt when it blocks, I get the following (shortened) stack trace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'data'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:
Traceback (most recent call last):

  (SHORTENED HERE. Just contained the trace from the console through my call to this function)

  File "/opt/my_app/add_tasks_module/__init__.py", line 42, in add_tasks
    my_task.s(new_task['uuid']),
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 146, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/local/lib/python3.6/site-packages/celery/local.py", line 109, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/__init__.py", line 72, in task_by_cons
    return app.tasks[
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 1228, in tasks
    self.finalize(auto=True)
  File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 507, in finalize
    with self._finalize_mutex:

It seems like there is a problem with acquiring a mutex.

Currently I am using a workaround to detect if sys.argv[0] contains uwsgi and then not add the periodic tasks, as only beat needs the tasks, but I would like to understand what is going wrong here to solve the problem more permanently.

Could this problem have something to do with using uwsgi multi-threaded or multi-processed where one thread/process holds the mutex the other needs?

I'd appreciate any hints that can help me solve the problem. Thank you.

I am using: Django 1.11.7 and Celery 4.1.0

Edit 1

I have created a minimal setup for this problem:

celery.py:

import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        60,
        my_task.s(),
        name='Testtask'
    )

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

tasks.py:

from celery import shared_task
@shared_task()
def my_task():
    print('ran')

Make sure that CELERY_TASK_ALWAYS_EAGER=False and that you have a working message queue.

Run:

./manage.py shell -c 'from myapp.tasks import my_task; my_task.delay()'

Wait about 10 seconds before interrupting to see the above error.

like image 504
Tim Avatar asked Nov 07 '22 14:11

Tim


1 Answers

So, I have found out that the @shared_task decorator creates the problem. I can circumvent the problem when I declare the task right in the function called by the signal like so:

def add_tasks(celery):
    @celery.task
    def my_task(uuid):
        print(uuid)

    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            my_task.s(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

This solution is actually working for me, but I have one more problem with this: I use this code in a pluggable app, so I can't directly access the celery app outside of the signal handler but would like to also be able to call the my_task function from within other code. By defining it within the function it is not available outside of the function, so I cannot import it anywhere else.

I can probably work around this by defining the task function outside of the signal function, and use it with different decorators here and in the tasks.py. I am wondering though if there is a decorator apart from the @shared_task decorator that I can use in the tasks.py that does not create the problem.

The current best solution could be:

task_app.__init__.py:

def my_task(uuid):
    # do stuff
    print(uuid)

def add_tasks(celery):
    celery_my_task = celery.task(my_task)
    for new_task in settings.NEW_TASKS:
        celery.add_periodic_task(
            new_task['interval'],
            celery_my_task(new_task['uuid']),
            name='My Task %s' % new_task['uuid'],
        )

task_app.tasks.py:

from celery import shared_task
from task_app import my_task
shared_my_task = shared_task(my_task)

myapp.celery.py:

import os
from celery import Celery
from django.conf import settings


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from task_app import add_tasks
    add_tasks(sender)


app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
like image 142
Tim Avatar answered Nov 14 '22 21:11

Tim