Say that I have this task:
def do_stuff_for_some_time(some_id):
e = Model.objects.get(id=some_id)
e.domanystuff()
and I'm using it like so:
do_stuff_for_some_time.apply_async(args=[some_id], queue='some_queue')
The problem I'm facing is that there are a lot of repetitive tasks with the same arg param and it's boggling down the queue.
Is it possible to apply async only if the same args and the same task is not in the queue?
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time.
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It's a task queue with focus on real-time processing, while also supporting task scheduling.
To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".
celery-singleton solves this requirement
Caveat: requires redis broker (for distributed locks)
pip install celery-singleton
Use the Singleton
task base class:
from celery_singleton import Singleton
@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
e = Model.objects.get(id=some_id)
e.domanystuff()
from the docs:
calls to do_stuff.delay() will either queue a new task or return an AsyncResult for the currently queued/running instance of the task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With