Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery v4 not routing tasks as expected

I'm updating my celery workers from celery v3 to celery v4, and all my tasks are Class Based Tasks. I have manually registered the tasks since it's what indicated in the upgrade doc.

The problem is in task routing, I have the following Task:

class RegisterTask(Task):
    routing_key = 'app_server.register'

    def run(**params):
        whatever ...

I'm running two celery workers, one on the default queue, and the other on the register queue, like below:

# Default Worker
celery -A app_server worker --loglevel=info --concurrency=1

# Register Worker
celery -A app_server worker -Q app_server_register --loglevel=info --concurrency=1

and here's my queues definition:

CELERY_TASK_DEFAULT_QUEUE = 'app_server_default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'app_server.default'

CELERY_TASK_QUEUES = (
    Queue('app_server_default', routing_key='app_server.default'),
    Queue('app_server_register', routing_key='app_server.register')
)

The unexpected behavior is the difference I see when I call the task using Celery V3 and Celery V4.

# Celery V3
RegisterTask().delay(**params)
# task is consumed by the register worker!

# Celery V4
RegisterTask().delay(**params)
# task is consumed by the default worker!

And I want the task to be consumed by the register worker (celery v3 behavior), hence why I hardcoded the routing_key attribute in the class based task. But Celery V4 seems to ignore the routing_key attribute in the class based task.

[I'm also using redis as the broker, if it's any important]

Any Ideas on this issue?

Thanks!

like image 784
SpiXel Avatar asked Feb 24 '19 06:02

SpiXel


People also ask

How does Celery execute tasks?

Process of Task Execution by Celery can be broken down into:Your application sends the tasks to the task broker, it is then reserved by a worker for execution & finally the result of task execution is stored in the result backend.

What is Shared_task in Celery?

The "shared_task" decorator allows creation of Celery tasks for reusable apps as it doesn't need the instance of the Celery app. It is also easier way to define a task as you don't need to import the Celery app instance.

What is Apply_async in Celery?

apply_async(args[, kwargs[, …]]) Sends a task message. delay(*args, **kwargs) Shortcut to send a task message, but doesn't support execution options. calling ( __call__ )


1 Answers

routing_key as a class attribute is deprecated in v4.

You can supply it to a task on invocation as a kwarg to apply_async or you can manually configure routing task types to queues in celery config. See the docs


The Task class at celery.task.base.Task is deprecated for celery.app.task.Task.

This is a design change where the new Task class prefers to bind configuration when the task is used as opposed to at instantiation time. See this comment

Looking through the source you can see routing_key is set when the task is bound using _get_exec_options. Here the options are set using an instance (self) not the class.

I've gathered this by looking at the celery source code but if there's no mention of deprecation in the docs I guess they're out of date.

Perhaps filling an issue on their github and using one of the options provided in the new paradigm is the best option going forward.


Also there's in the doc that tasks routing_key will be read from task metadata, so class based tasks can't have this metadata anymore?

Correct, the metadata isn't on the class object anymore. It is set via config or lazily via a task instance when called.

The config file should be the goto for static routing while setting on task call should be used for overriding the configured default.

like image 173
stacksonstacks Avatar answered Oct 02 '22 22:10

stacksonstacks