Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Publish Messages to Exchanges in RabbitMQ with Celery

I have written a Django-Project which cues different tasks async via my_task.delay(). The problem is as the project is getting bigger it is really difficult to route the tasks properly - I started to write tasks which only purpose is to combine different tasks, which makes the code messy after some time.

As I was reading some RabbitMQ documentation I came around a solution which could structure my project a lot better which relies on Exchanges. Exchanges can publish messages to multiple queues on which a Cosumer can consume it, in short:

Publish/Subscribe in RabbitMQ Documentation

The RabbitMQ-Documentation describes a solution with Pika, which is a more low-level Client for RabbitMQ than Celery.

The Celery-Documentation describes this scenario in its Documentation but doesn't describe how to create a Producer which produces Messages which are sent to an Exchange which distributes it to various Queues as seen in the Picture above. It only describes how to message Queues and send tasks - but I want this handeled by the Exchange.

I found that Kombu, on which celery relies under the hood, has a function to send messages to an Exchange via a Producer but I can't find any documentation how to use this in celery-django.

How am I able to archieve the described procedure in celery?

PS: There is already a similar question on StackOverflow which advices to use the primitives like Chain and Group of Celery but this contradicts the Exchange paradigm in my understanding.

like image 568
capitalg Avatar asked Dec 19 '22 01:12

capitalg


1 Answers

To get good routing of tasks you should create more queues. Celery uses a single exchange and have direct bindings to queues. By settings up several queues you can split up the work. You can then start more workers that only consume from certain queues to faster process the queues with the most work.

Take a look at how Sentry solves it: https://github.com/getsentry/sentry/blob/master/src/sentry/conf/server.py#L467

Also if your really want to use several exchanges you could set up more exchanges with in your settings file and on task_queues you define which exchange is to be used on which route. Keep to direct exchanges in celery to you are able to switch to other brokers if needed.

first_exchange = kombu.Exchange(name="first", type="direct")
second_exchange = kombu.Exchange(name="second", type="direct")
task_queues = [
    kombu.Queue(
        name="queue1",
        exchange=first_exchange,
        routing_key="queue1",
    ),
    kombu.Queue(
        name="queue2",
        exchange=second_exchange,
        routing_key="queue2",
    )]

I ended up on this question several times when I tried to solve posting messages from celery tasks to another exchange that is not used by celery. I thought I would share the result of my findings incase someone else end up here with the same question.

This is using Celery 4.3 and not django-celery that is no longer needed in django.

I have a django application that besides from using celery sends "regular" AMQP messages over RabbitMQ to other smaller integration applications and customers.

So in a celery task I would like to post to an exchange that is separate from the one I use for celery tasks and the messages are not tasks.

My initial solution just to get it to work was to create a new connection in each task. But I thought this was not so scalable as I would get a lot of connections if my application was handling alot of concurrent tasks. It as also annoying to import the django settings for my amqp-connection string everywhere I needed a new connection.

Instead I started to look into if I could somehow get the current connection from celery and reuse it to publish to RabbitMQ. Preferably using connection pools as I am doing in my non django consumers and producers.

It turns out that the connection and prodcucer pools are easily available.

Initial setup of my external messaging in celery.py:

app = Celery("my_proj")
setting_object = import_module(settings.CELERY_CONF_MODULE)
app.config_from_object(setting_object)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

with app.pool.acquire(block=True) as conn:
    exchange = kombu.Exchange(
        name=settings.AMQP_PUBLISH_TO, type="topic", durable=True, channel=conn
    )
    exchange.declare()

    queue = kombu.Queue(
        name="my_queue",
        exchange=exchange,
        routing_key="my_queue.#",
        channel=conn,
    )
    queue.declare()

In my celery tasks I use the current_app since it is run on workers.

@task
def my_task(attrs):
    # do something
    with current_app.producer_pool.acquire(block=True) as producer:
        producer.publish(
            body,
            routing_key="my_queue.test",
            exchange=settings.AMQP_PUBLISH_TO,
            retry=True,
       )

This works really well for me. But you can't really us CELERY_ALWAYS_EAGER. I had problems with that since the connections are not really used then. So my tests needed to be written a bit better, but that was ok.

like image 188
Krolken Avatar answered Dec 20 '22 17:12

Krolken