I'd like to use Celery as a queue for my tasks so my web app could enqueue a task, return a response and the task will be processed meanwhile / someday / ... I build a kind of API, so I don't know what sort of tasks will be there in advance - in future, there can be tasks dealing with HTTP requests, another IO, but also CPU-consuming tasks. In respect to that, I'd like to run Celery's workers on processes, as these are universal kind of parallelism in Python.
However, I'd like to use gevent in my tasks too, so I could have a single task spawning many HTTP requests, etc. The problem is, when I do this:
from gevent import monkey monkey.patch_all() Celery stops to work. It starts, but no tasks can be effectively enqueued - they seem to go to the broker, but Celery worker doesn't collect them and process them. Only starts and waits. If I delete those lines and perform the task without any gevent and parallelization, everything works.
I think it could be because gevent patches also threading. So I tried
from gevent import monkey monkey.patch_all(thread=False) ...but then Celery doesn't even start, it crashes without giving a reason (debug level of logging turned on).
Is it possible to use Celery for enqueuing tasks and gevent for doing some stuff inside a single task? How? What do I do wrong?
Celery itself is using billiard (a multiprocessing fork) to run your tasks in separate processes.
If you look at the celery DOCS on tasks you see that to call a task synchronosuly, you use the apply() method as opposed to the apply_async() method. The DOCS also note that: If the CELERY_ALWAYS_EAGER setting is set, it will be replaced by a local apply() call instead.
Introduction. Celery is a task queue/job queue based on asynchronous message passing. It can be used as a background task processor for your application in which you dump your tasks to execute in the background or at any given moment. It can be configured to execute your tasks synchronously or asynchronously.
This way, you delegate queue creation to Celery. You can use apply_async with any queue and Celery will handle it, provided your task is aware of the queue used by apply_async . If none is provided then the worker will listen only for the default queue.
I believe the recommended way to start the task is as follows.
python manage.py celery worker -P gevent --loglevel=INFO Gevent needs to be patched as early as possible.
You can run celery with multiple threads containing multiple greenlets like this:
$ celery multi start 4 -P gevent -l info -c:1-4 1000 If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With