Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In-Memory broker for celery unit tests

I have a REST API written in Django, with and endpoint that queues a celery task when posting to it. The response contains the task id which I'd like to use to test that the task is created and get the result. So, I'd like to do something like:

def test_async_job():     response = self.client.post("/api/jobs/", some_test_data, format="json")     task_id = response.data['task_id']     result = my_task.AsyncResult(task_id).get()     self.assertEquals(result, ...) 

I obviously don't want to have to run a celery worker to run the unit tests, I expect to mock it somehow. I can't use CELERY_ALWAYS_EAGER because that seems to bypass the broker altogether, preventing me to use AsyncResult to get the task by its id (as stated here).

Going through celery and kombu docs, I've found that there is an in-memory transport for unit tests, that would do what I'm looking for. I tried overriding the BROKER_URL setting to use it on the tests:

@override_settings(BROKER_URL='memory://') def test_async_job(): 

But the behavior is the same as with the ampq broker: it blocks the test waiting for the result. Any Idea how am I supposed to configure this broker to get it working in the tests?

like image 474
Facundo Olano Avatar asked Mar 06 '14 19:03

Facundo Olano


People also ask

What is a broker in Celery?

The broker is the third-person facilitator between a buyer and a seller. Celery requires a solution to send and receive messages; usually, this comes in the form of a separate service called a message broker. In celery, the broker is Redis, RabbitMQ, etc who conveying the message between a client and celery.

Is Celery task ID unique?

Answer: Yes, but make sure it's unique, as the behavior for two tasks existing with the same id is undefined.

What is Celery library?

Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time. Celery.


2 Answers

You can specify the broker_backend in your settings :

if 'test' in sys.argv[1:]:     BROKER_BACKEND = 'memory'     CELERY_TASK_ALWAYS_EAGER = True     CELERY_TASK_EAGER_PROPAGATES = True 

or you can override the settings with a decorator directly in your test

import unittest from django.test.utils import override_settings   class MyTestCase(unittest.TestCase):      @override_settings(CELERY_TASK_EAGER_PROPAGATES=True,                        CELERY_TASK_ALWAYS_EAGER=True,                        BROKER_BACKEND='memory')     def test_mytask(self):         ... 
like image 97
Guillaume Vincent Avatar answered Sep 21 '22 12:09

Guillaume Vincent


You can use the Kombu in-memory broker to run unit tests, however to do so you need to spin-up a Celery worker using the same Celery app object as the Django server.

To use the in-memory broker, set BROKER_URL to memory://localhost/

Then, to spin up a small celery worker you can do the following:

app = <Django Celery App>  # Set the worker up to run in-place instead of using a pool app.conf.CELERYD_CONCURRENCY = 1 app.conf.CELERYD_POOL = 'solo'  # Code to start the worker def run_worker():     app.worker_main()  # Create a thread and run the worker in it import threading t = threading.Thread(target=run_worker) t.setDaemon(True) t.start() 

You need to make sure you use the same app as the Django celery app instance.

Note that starting the worker will print many things and modify logging settings.

like image 34
Gal Hochberg Avatar answered Sep 22 '22 12:09

Gal Hochberg