Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery - bulk queue tasks

I have some code that queues a large number (1000s) of celery tasks, for example's sake let's say it's this:

for x in xrange(2000):
    example_task.delay(x)

Is there a better/more efficient way of queuing a large number of tasks at once? They all have different arguments.

like image 547
J. Doe Avatar asked Nov 18 '16 18:11

J. Doe


People also ask

Where are celery tasks stored?

In Celery, a result back end is a place where, when you call a Celery task with a return statement, the task results are stored. Choosing the right results back end can potentially save you hours of pain later.

How do you purge all celery tasks?

Best method I found was redis-cli KEYS "celery*" | xargs redis-cli DEL which worked for me. This will wipe out all tasks stored on the redis backend you're using.

How do you track celery tasks?

Celery -> Redis -> Socket.io -> Browser Fire off the Celery task, get the ID. Keep the ID in your client app, open a socket.io channel to listen for updates. The celery task sends messages to Redis, this will trigger socket.io events. Socket.io relays the messages to the browser, in real time.


2 Answers

Invoking large number of tasks could not be healthy for your celery workers. Also if you are considering collecting result of invoked task then your code will not be optimal.

You can chunck your tasks in batches of certain size. Consider example mentioned in below link.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks

like image 51
Sanket Sudake Avatar answered Oct 14 '22 01:10

Sanket Sudake


We ran into this problem too when we wanted to use Celery to process several million PDFs. Our solution was to write something we call the CeleryThrottle. Basically, you configure the throttle with a desired Celery queue and the number of tasks you want in it, and then you create your tasks in a loop. As you create your tasks, the throttle will monitor the length of the actual queue. If it's being depleted too quickly, it'll speed up your loop for a while so more tasks are added to the queue. If the queue is growing too large, it will slow down your loop and let some of the tasks complete.

Here's the code:

class CeleryThrottle(object):
    """A class for throttling celery."""

    def __init__(self, min_items=100, queue_name='celery'):
        """Create a throttle to prevent celery run aways.

        :param min_items: The minimum number of items that should be enqueued. 
        A maximum of 2× this number may be created. This minimum value is not 
        guaranteed and so a number slightly higher than your max concurrency 
        should be used. Note that this number includes all tasks unless you use
        a specific queue for your processing.
        """
        self.min = min_items
        self.max = self.min * 2

        # Variables used to track the queue and wait-rate
        self.last_processed_count = 0
        self.count_to_do = self.max
        self.last_measurement = None
        self.first_run = True

        # Use a fixed-length queue to hold last N rates
        self.rates = deque(maxlen=15)
        self.avg_rate = self._calculate_avg()

        # For inspections
        self.queue_name = queue_name

    def _calculate_avg(self):
        return float(sum(self.rates)) / (len(self.rates) or 1)

    def _add_latest_rate(self):
        """Calculate the rate that the queue is processing items."""
        right_now = now()
        elapsed_seconds = (right_now - self.last_measurement).total_seconds()
        self.rates.append(self.last_processed_count / elapsed_seconds)
        self.last_measurement = right_now
        self.last_processed_count = 0
        self.avg_rate = self._calculate_avg()

    def maybe_wait(self):
        """Stall the calling function or let it proceed, depending on the queue.

        The idea here is to check the length of the queue as infrequently as 
        possible while keeping the number of items in the queue as closely 
        between self.min and self.max as possible.

        We do this by immediately enqueueing self.max items. After that, we 
        monitor the queue to determine how quickly it is processing items. Using 
        that rate we wait an appropriate amount of time or immediately press on.
        """
        self.last_processed_count += 1
        if self.count_to_do > 0:
            # Do not wait. Allow process to continue.
            if self.first_run:
                self.first_run = False
                self.last_measurement = now()
            self.count_to_do -= 1
            return

        self._add_latest_rate()
        task_count = get_queue_length(self.queue_name)
        if task_count > self.min:
            # Estimate how long the surplus will take to complete and wait that
            # long + 5% to ensure we're below self.min on next iteration.
            surplus_task_count = task_count - self.min
            wait_time = (surplus_task_count / self.avg_rate) * 1.05
            time.sleep(wait_time)

            # Assume we're below self.min due to waiting; max out the queue.
            if task_count < self.max:
                self.count_to_do = self.max - self.min
            return

        elif task_count <= self.min:
            # Add more items.
            self.count_to_do = self.max - task_count
            return

And we use it like:

throttle = CeleryThrottle(min_items=30, queue_name=queue)
for item in items:
    throttle.maybe_wait()
    do_something.delay()

So it's pretty simple to use, and it does a pretty good job of keeping the queue in a happy place — not too long, not too short. It keeps a rolling average of the rate that the queue is depleting, and it can adjust it's own timers accordingly.

like image 36
mlissner Avatar answered Oct 14 '22 01:10

mlissner