Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to skip delegating a celery task if the params and the task name is already queued in the server?

Say that I have this task:

def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()

and I'm using it like so:

do_stuff_for_some_time.apply_async(args=[some_id], queue='some_queue')

The problem I'm facing is that there are a lot of repetitive tasks with the same arg param and it's boggling down the queue.

Is it possible to apply async only if the same args and the same task is not in the queue?

like image 535
Stupid.Fat.Cat Avatar asked Jul 14 '17 16:07

Stupid.Fat.Cat


People also ask

How does celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

Is Celery a task queue?

Celery is an open source asynchronous task queue or job queue which is based on distributed message passing. While it supports scheduling, its focus is on operations in real time.

What is Celery distributed task queue?

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It's a task queue with focus on real-time processing, while also supporting task scheduling.

Can a celery task call another task?

To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks".


1 Answers

celery-singleton solves this requirement

Caveat: requires redis broker (for distributed locks)

pip install celery-singleton

Use the Singleton task base class:

from celery_singleton import Singleton

@celery_app.task(base=Singleton)
def do_stuff_for_some_time(some_id):
    e = Model.objects.get(id=some_id)
    e.domanystuff()


from the docs:

calls to do_stuff.delay() will either queue a new task or return an AsyncResult for the currently queued/running instance of the task

like image 142
stacksonstacks Avatar answered Nov 12 '22 03:11

stacksonstacks