I am trying to build an application on top of Celery framework.
I have a module settings/celery_settings.py with the code that initializes Celery application like this (I expand some variables):
from __future__ import absolute_import
from celery import Celery
pfiles = ['other_tasks.test123', 'balance_log.balance_log']
app = Celery('myapp')
# here I just have some parameters defined like broker, result backend, etc
# app.config_from_object(settings)
# TRYING to discover tasks
app.autodiscover_tasks(pfiles)
Files other_tasks/test123.py and balance_log/balance_log.py contain task definitions like these:
# file other_tasks/test123.py
from celery import shared_task, Task
@shared_task()
def mytask():
print("Test 1234!")
class TestTask01(Task):
def run(self, client_id=None):
logger.debug("TestTask01: run")
return client_id
I run celery worker:
python3 /usr/local/bin/celery -A settings.celery_settings worker
And this way it can discover tasks. I can call these tasks.
But then I try to use IPython:
In [1]: from settings.celery_settings import app
In [2]: app.tasks
Out[2]:
{'celery.backend_cleanup': <@task: celery.backend_cleanup of XExchange:0x7f9f50267ac8>,
'celery.chain': <@task: celery.chain of XExchange:0x7f9f50267ac8>,
'celery.chord': <@task: celery.chord of XExchange:0x7f9f50267ac8>,
'celery.chord_unlock': <@task: celery.chord_unlock of XExchange:0x7f9f50267ac8>,
'celery.chunks': <@task: celery.chunks of XExchange:0x7f9f50267ac8>,
'celery.group': <@task: celery.group of XExchange:0x7f9f50267ac8>,
'celery.map': <@task: celery.map of XExchange:0x7f9f50267ac8>,
'celery.starmap': <@task: celery.starmap of XExchange:0x7f9f50267ac8>}
And apparently it does not discover the tasks.
It seems that when I call tasks explicitly, I first import them and specify exact path for the celery while call. That's why it works.
Question: how do I make them discovered to have a list of known tasks?
Finally I figured out that there is an additional parameter for autodiscover_tasks function:
def autodiscover_tasks(self, packages, related_name='tasks', force=False):
...
So, after setting force=True it turned to work!
app.autodiscover_tasks(pfiles, force=True)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With