Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

celery task routes not working as expected

Tags:

python

celery

I am practicing celery and I want to assign my task to a specific queue however it does not work as expected

My __init__.py

import os
import sys
from celery import Celery

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

sys.path.append(CURRENT_DIR)

app = Celery()
app.config_from_object('celery_config')

My celery_config.py

amqp = 'amqp://guest:guest@localhost:5672//'
broker_url = amqp
result_backend = amqp

task_routes = ([
    ('import_feed', {'queue': 'queue_import_feed'})
])

My tasks.py

from . import app

@app.task(name='import_feed')
def import_feed():
    pass

How I run my worker:

celery -A subscriber1.tasks worker -l info

My client's __init__.py :

import os
import sys
from celery import Celery

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

sys.path.append(CURRENT_DIR)

app = Celery()
app.config_from_object('celery_config')

My client's celery_config.py:

from kombu.common import Broadcast

amqp = 'amqp://guest:guest@localhost:5672//'
BROKER_URL = amqp
CELERY_RESULT_BACKEND = amqp

Then in my client's shell I tried:

from publisher import app
result = app.send_task('import_feed')

Then my worker got the task?! Which I expect should not because I assigned that to a specific queue. I tried in my client the command below and no task has been received by my worker which I expect to have received instead on the first one

result = app.send_task('import_feed', queue='queue_import_feed')

Seems like I misunderstood something in the routing part. But what I really want is import_feed task to run only if the queue_import_feed queue is specified when send a task

like image 588
Dean Christian Armada Avatar asked Sep 04 '17 06:09

Dean Christian Armada


1 Answers

You can change the default queue that the worker processes.

app.send_task('import_feed') sends the task to celery queue.

app.send_task('import_feed', queue='queue_import_feed') sends the task to queue_import_feed but your worker is only processing tasks in celery queue.

To process specific queues, use the -Q switch

celery -A subscriber1.tasks worker -l info -Q 'queue_import_feed'

Edit

In order to place a restriction on send_task such that a worker reacts to import_feed task only when it's published with a queue, you need to override send_task on Celery and also provide a custom AMQP with a default_queue set to None.

reactor.py

from celery.app.amqp import AMQP
from celery import Celery

class MyCelery(Celery):
    def send_task(self, name=None, args=None, kwargs=None, **options):
        if 'queue' in options:
            return super(MyCelery, self).send_task(name, args, kwargs, **options)


class MyAMQP(AMQP):
    default_queue = None

celery_config.py

from kombu import Exchange, Queue

...

task_exchange = Exchange('default', type='direct')
task_create_missing_queues = False

task_queues = [
    Queue('feed_queue', task_exchange, routing_key='feeds'),
]

task_routes = {
    'import_feed': {'queue': 'feed_queue', 'routing_key': 'feeds'}
}

__init__.py

celeree = MyCelery(amqp='reactor.MyAMQP')
like image 177
Oluwafemi Sule Avatar answered Oct 24 '22 12:10

Oluwafemi Sule