Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: how to limit number of tasks in queue and stop feeding when full?

I am very new to Celery and here is the question I have:

Suppose I have a script that is constantly supposed to fetch new data from DB and send it to workers using Celery.

tasks.py

# Celery Task
from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def process_data(x):
    # Do something with x
    pass

fetch_db.py

# Fetch new data from DB and dispatch to workers.
from tasks import process_data

while True:
    # Run DB query here to fetch new data from DB fetched_data

    process_data.delay(fetched_data)

    sleep(30);

Here is my concern: the data is being fetched every 30 seconds. process_data() function could take much longer and depending on the amount of workers (especially if too few) the queue might get throttled as I understand.

  1. I cannot increase number of workers.
  2. I can modify the code to refrain from feeding the queue when it is full.

The question is how do I set queue size and how do I know it is full? In general, how to deal with this situation?

like image 891
jazzblue Avatar asked Feb 05 '16 19:02

jazzblue


People also ask

How do I stop celery task?

If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.

How does celery task queue work?

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.

Does Celery run tasks in parallel?

Celery task canvas Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task.

What is Celery distributed task queue?

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It's a task queue with focus on real-time processing, while also supporting task scheduling.


1 Answers

You can set rabbitmq x-max-length in queue predeclare using kombu

example :

import time
from celery import Celery
from kombu import Queue, Exchange

class Config(object):
    BROKER_URL = "amqp://guest@localhost//"

    CELERY_QUEUES = (
        Queue(
            'important',
            exchange=Exchange('important'),
            routing_key="important",
            queue_arguments={'x-max-length': 10}
        ),
    )

app = Celery('tasks')
app.config_from_object(Config)


@app.task(queue='important')
def process_data(x):
    pass

or using Policies

rabbitmqctl set_policy Ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues
like image 194
faisal burhanudin Avatar answered Oct 25 '22 18:10

faisal burhanudin