When a Django test case runs, it creates an isolated test database so that database writes get rolled back when each test completes. I am trying to create an integration test with Celery, but I can't figure out how to connect Celery to this ephemeral test database. In the naive setup, Objects saved in Django are invisible to Celery and objects saved in Celery persist indefinitely.
Here is an example test case:
import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
class MyTestCase(APITestCase):
@classmethod
def setUpTestData(cls):
# This object is not visible to Celery
MyModel(id='test_object').save()
def test_celery_integration(self):
# This view spawns a Celery task
# Task should see MyModel.objects.get(id='test_object'), but can't
http_response = self.client.post('/', 'test_data', format='json')
result = get_result_from_response(http_response)
result.get() # Wait for task to finish before ending test case
# Objects saved by Celery task should be deleted, but persist
I have two questions:
How do make it so that Celery can see the objects that the Django test case?
How do I ensure that all objects saved by Celery are automatically rolled back once the test completes?
I am willing to manually clean up the objects if doing this automatically is not possible, but a deletion of objects in tearDown
even in APISimpleTestCase
seems to be rolled back.
This is possible by starting a Celery worker within the Django test case.
Django's in-memory database is sqlite3. As it says on the description page for Sqlite in-memory databases, "[A]ll database connections sharing the in-memory database need to be in the same process." This means that, as long as Django uses an in-memory test database and Celery is started in a separate process, it is fundamentally impossible to have Celery and Django to share a test database.
However, with celery.contrib.testing.worker.start_worker
, it possible to start a Celery worker in a separate thread within the same process. This worker can access the in-memory database.
This assumes that Celery is already setup in the usual way with the Django project.
Because Django-Celery involves some cross-thread communication, only test cases that don't run in isolated transactions will work. The test case must inherit directly from SimpleTestCase
or its Rest equivalent APISimpleTestCase
and set databases
to '__all__'
or just the database that the test interacts with.
The key is to start a Celery worker in the setUpClass
method of the TestCase
and close it in the tearDownClass
method. The key function is celery.contrib.testing.worker.start_worker
, which requires an instance of the current Celery app, presumably obtained from mysite.celery.app
and returns a Python ContextManager
, which has __enter__
and __exit__
methods, which must be called in setUpClass
and tearDownClass
, respectively. There is probably a way to avoid manually entering and existing the ContextManager
with a decorator or something, but I couldn't figure it out. Here is an example tests.py
file:
from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase
from mysite.celery import app
class BatchSimulationTestCase(SimpleTestCase):
databases = '__all__'
@classmethod
def setUpClass(cls):
super().setUpClass()
# Start up celery worker
cls.celery_worker = start_worker(app, perform_ping_check=False)
cls.celery_worker.__enter__()
@classmethod
def tearDownClass(cls):
super().tearDownClass()
# Close worker
cls.celery_worker.__exit__(None, None, None)
def test_my_function(self):
# my_task.delay() or something
For whatever reason, the testing worker tries to use a task called 'celery.ping'
, probably to provide better error messages in the case of worker failure. The task it is looking for is celery.contrib.testing.tasks.ping
, which is not available at test time. Setting the perform_ping_check
argument of start_worker
to False
skips the check for this and avoids the associated error.
Now, when the tests are run, there is no need to start a separate Celery process. A Celery worker will be started in the Django test process as a separate thread. This worker can see any in-memory databases, including the default in-memory test database. To control the number of workers, there are options available in start_worker
, but it appears the default is a single worker.
For your unittests I would recommend skipping the celery dependency, the two following links will provide you with the necesarry infos to start your unittests:
If you really want to test the celery function calls including a queue I'd propably set up a dockercompose with the server, worker, queue combination and extend the custom CeleryTestRunner from the django-celery docs. But I wouldn't see a benefit from it because the test system is pbly to far away from production to be representative.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With