How can I create a wrapper that makes celery tasks look like asyncio.Task
? Or is there a better way to integrate Celery with asyncio
?
@asksol, the creator of Celery, said this::
It's quite common to use Celery as a distributed layer on top of async I/O frameworks (top tip: routing CPU-bound tasks to a prefork worker means they will not block your event loop).
But I could not find any code examples specifically for asyncio
framework.
Celery workers will consume tasks from the task queue, one at a time. When writing code for each task, we can integrate asyncio elements which would involve using keywords like async and await. Essentially, we are using asyncio to optimise each celery task.
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
EDIT: 01/12/2021 previous answer (find it at the bottom) didn't age well therefore I added a combination of possible solutions that may satisfy those who still look on how to co-use asyncio and Celery
Lets quickly break up the use cases first (more in-depth analysis here: asyncio and coroutines vs task queues):
So it makes sense in the context of Python's "Do one thing and do it well" to not try and mix asyncio and celery together.
BUT what happens in cases where we want to be able to run a method both asynchronously and as an async task? then we have some options to consider:
The best example that I was able to find is the following: https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/ (and I just found out that it is @Franey's response):
Define your async method.
Use asgiref
's sync.async_to_sync
module to wrap the async method and run it synchronously inside a celery task:
# tasks.py import asyncio from asgiref.sync import async_to_sync from celery import Celery app = Celery('async_test', broker='a_broker_url_goes_here') async def return_hello(): await asyncio.sleep(1) return 'hello' @app.task(name="sync_task") def sync_task(): async_to_sync(return_hello)()
A use case that I came upon in a FastAPI application was the reverse of the previous example:
An intense CPU bound process is hogging up the async endpoints.
The solution is to refactor the async CPU bound process into a celery task and pass a task instance for execution from the Celery queue.
A minimal example for visualization of that case:
import asyncio import uvicorn from celery import Celery from fastapi import FastAPI app = FastAPI(title='Example') worker = Celery('worker', broker='a_broker_url_goes_here') @worker.task(name='cpu_boun') def cpu_bound_task(): # Does stuff but let's simplify it print([n for n in range(1000)]) @app.get('/calculate') async def calculate(): cpu_bound_task.delay() if __name__ == "__main__": uvicorn.run('main:app', host='0.0.0.0', port=8000)
Another solution seems to be what @juanra and @danius are proposing in their answers, but we have to keep in mind that performance tends to take a hit when we intermix sync and async executions, thus those answers need monitoring before we can decide to use them in a prod environment.
Finally, there are some ready-made solutions, that I cannot recommend (because I have not used them myself) but I will list them here:
Well that didn't age so well did it? Version 5.0 of Celery didn't implement asyncio compatibility thus we cannot know when and if this will ever be implemented... Leaving this here for response legacy reasons (as it was the answer at the time) and for comment continuation.
That will be possible from Celery version 5.0 as stated on the official site:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
The above was quoted from the previous link.
So the best thing to do is wait for version 5.0 to be distributed!
In the meantime, happy coding :)
This simple way worked fine for me:
import asyncio from celery import Celery app = Celery('tasks') async def async_function(param1, param2): # more async stuff... pass @app.task(name='tasks.task_name', queue='queue_name') def task_name(param1, param2): asyncio.run(async_function(param1, param2))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With