delay(...)" just call "task(...)" from your unit tests. Use CELERY_ALWAYS_EAGER. This will cause your tasks to be called immediately at the point you say "task. delay(...)", so you can test the whole path (but not any asynchronous behavior).
It is possible to test tasks synchronously using any unittest lib out there. I normaly do 2 different test sessions when working with celery tasks. The first one (as I'm suggesting bellow) is completely synchronous and should be the one that makes sure the algorithm does what it should do. The second session uses the whole system (including the broker) and makes sure I'm not having serialization issues or any other distribution, comunication problem.
So:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
And your test:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
Hope that helps!
An update to my seven years old answer:
You can run a worker in a seperate thread via an pytest fixture:
https://docs.celeryproject.org/en/stable/userguide/testing.html#celery-worker-embed-live-worker
According to the docs you should not use "always_eager" (see top of the page of above link).
Old answer:
I use this:
with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
...
Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER lets you run your task synchronous, and you don't need a celery server.
Depends on what exactly you want to be testing.
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope='module')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With