I'm updating my celery workers from celery v3 to celery v4, and all my tasks are Class Based Tasks. I have manually registered the tasks since it's what indicated in the upgrade doc.
The problem is in task routing, I have the following Task:
class RegisterTask(Task):
routing_key = 'app_server.register'
def run(**params):
whatever ...
I'm running two celery workers, one on the default queue, and the other on the register queue, like below:
# Default Worker
celery -A app_server worker --loglevel=info --concurrency=1
# Register Worker
celery -A app_server worker -Q app_server_register --loglevel=info --concurrency=1
and here's my queues definition:
CELERY_TASK_DEFAULT_QUEUE = 'app_server_default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'app_server.default'
CELERY_TASK_QUEUES = (
Queue('app_server_default', routing_key='app_server.default'),
Queue('app_server_register', routing_key='app_server.register')
)
The unexpected behavior is the difference I see when I call the task using Celery V3 and Celery V4.
# Celery V3
RegisterTask().delay(**params)
# task is consumed by the register worker!
# Celery V4
RegisterTask().delay(**params)
# task is consumed by the default worker!
And I want the task to be consumed by the register worker (celery v3 behavior), hence why I hardcoded the routing_key attribute in the class based task. But Celery V4 seems to ignore the routing_key attribute in the class based task.
[I'm also using redis as the broker, if it's any important]
Any Ideas on this issue?
Thanks!
Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.
The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.
apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options. calling ( __call__ )
routing_key
as a class attribute is deprecated in v4.
You can supply it to a task on invocation as
a kwarg to apply_async
or you can manually configure routing task types to queues in celery config.
See the docs
The Task class at celery.task.base.Task
is deprecated for celery.app.task.Task
.
This is a design change where the new Task class prefers to bind configuration when the task is used as opposed to at instantiation time. See this comment
Looking through the source you can see routing_key
is set when the task is bound using _get_exec_options
. Here the options are set using an instance (self
) not the class.
I've gathered this by looking at the celery source code but if there's no mention of deprecation in the docs I guess they're out of date.
Perhaps filling an issue on their github and using one of the options provided in the new paradigm is the best option going forward.
Also there's in the doc that tasks routing_key will be read from task metadata, so class based tasks can't have this metadata anymore?
Correct, the metadata isn't on the class object anymore. It is set via config or lazily via a task instance when called.
The config file should be the goto for static routing while setting on task call should be used for overriding the configured default.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With