I am very new to Celery and here is the question I have:
Suppose I have a script that is constantly supposed to fetch new data from DB and send it to workers using Celery.
tasks.py
# Celery Task
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def process_data(x):
# Do something with x
pass
fetch_db.py
# Fetch new data from DB and dispatch to workers.
from tasks import process_data
while True:
# Run DB query here to fetch new data from DB fetched_data
process_data.delay(fetched_data)
sleep(30);
Here is my concern: the data is being fetched every 30 seconds. process_data() function could take much longer and depending on the amount of workers (especially if too few) the queue might get throttled as I understand.
The question is how do I set queue size and how do I know it is full? In general, how to deal with this situation?
If a task is revoked, the workers ignore the task and do not execute it. If you don't use persistent revokes your task can be executed after worker's restart. revoke has an terminate option which is False by default. If you need to kill the executing task you need to set terminate to True.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task, the Celery client adds a message to the queue, and the broker then delivers that message to a worker. The most commonly used brokers are Redis and RabbitMQ.
Celery task canvas Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task.
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It's a task queue with focus on real-time processing, while also supporting task scheduling.
You can set rabbitmq x-max-length
in queue predeclare using kombu
example :
import time
from celery import Celery
from kombu import Queue, Exchange
class Config(object):
BROKER_URL = "amqp://guest@localhost//"
CELERY_QUEUES = (
Queue(
'important',
exchange=Exchange('important'),
routing_key="important",
queue_arguments={'x-max-length': 10}
),
)
app = Celery('tasks')
app.config_from_object(Config)
@app.task(queue='important')
def process_data(x):
pass
or using Policies
rabbitmqctl set_policy Ten "^one-meg$" '{"max-length-bytes":1000000}' --apply-to queues
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With