Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to update Celery Task ETA?

I am building simple waiting list app in Django 1.10.3 using Celery 4.1.0.

I have following base tasks:

@shared_task
def start_user_counter():
    logging.info('Task executed @ {}'.format(datetime.datetime.utcnow()))
    # This task is executed when user reaches the Top of the queue.
    # Send email, perform other stuff in here ...

@shared_task
def update_queue():
    curr_time = datetime.datetime.utcnow()
    logging.info('Task called @ {}'.format(curr_time))
    time_to_exec = curr_time + datetime.timedelta(seconds=10)
    # Here, perform checks if task already exists in Redis
    # if it does not exist - create a new one and store it to Redis
    # if it does exist - update task's ETA.
    task_id = start_user_counter.apply_async(eta=time_to_exec)
    logging.info('Task ID: {}'.format(task_id))
    # ...

update_queue.delay()

Each task represents one user on the waiting list. New user will be assigned ETA when he is suppose to be removed from the waiting list (he reached the top at the ETA). However, each user has also a possibility to speed up the time when he will reach the top of the waiting list.

Question: How I can update ETA of already existing Task so it execcutes earlier than it was first anticipated?

like image 645
an0o0nym Avatar asked Aug 21 '17 14:08

an0o0nym


People also ask

What is ETA in Celery?

The ETA (estimated time of arrival) lets you set a specific date and time that is the earliest time at which your task will be executed. countdown is a shortcut to set eta by seconds into the future. >>> result = add. apply_async(args=[10, 10], countdown=3) >>> result. get() # this takes at least 3 seconds to return 20.

What does delay do in Celery?

delay() is the quickest way to send a task message to Celery. This method is a shortcut to the more powerful . apply_async() , which additionally supports execution options for fine-tuning your task message.

How do you schedule celery tasks?

A task is just a Python function. You can think of scheduling a task as a time-delayed call to the function. For example, you might ask Celery to call your function task1 with arguments (1, 3, 3) after five minutes. Or you could have your function batchjob called every night at midnight.

How do I add tasks to a Celery queue?

You can convert any function into a Celery task using the @app. task decorator which adds all the necessary functionality required for our existing function to run as a task. You can copy and paste the final code into a new file named tasks.py to follow the instructions given in the next section.


1 Answers

I have managed to solve this problem. My solution was to create sorted set using Redis. For score value associated with each user entry in that set I used timestamp representing time when the user was added into the waiting list. This helped me to keep users in waiting list in the right orderer.

I also used Redis hash for storing celery.result.AsyncResult.id that I received right after creating celery task with notify_user.apply_async((self.id,), eta=eta).id(see more below).

Then whenever I needed to update tasks' ETA I had to make workers to ignore the task by calling AsyncResult.revoke() like this AsyncResult(self.get_task_id()).revoke(). AsyncResult(self.get_task_id()) would return query task state associated with the id that I got from calling self.get_task_id(). Calling .revoke() on this AsyncResult instance would make any worker receiving the task, or having reserved the task, to ignore it.

This would allow me to create completely new task with a new ETA, whose id I would store back again in the same user record in Redis, thus overriding the old id value.

My code example is specific for my case, but the bottom line is:

  • Create a brand new task and store its celery.result.AsyncResult.id somewhere (i.e. self.task_id = T.apply_async((args,), eta=eta).id).
  • If your new ETA depends on the value of previous ETA store that value somewhere as well (i.e. self.eta = eta)
  • Create an instance of query task state using AsyncResult(task_id) and ignore this task calling .revoke() method on it. (i.e. AsyncResult(self.task_id).revoke()
  • Calculate new ETA and create new task with the new ETA (i.e. self.task_id = T.apply_async((args,), eta=new_eta).id)

#utils.py
import datetime as dt
import redis
from django.conf import settings
from celery.result import AsyncResult
from .tasks import notify_candidate


KEY_DATA = 'user:data'
KEY_QUEUE = 'user:queue'
TIME_DELTA = 'time_delta'
TASK_ID = 'task_id'
WAITING_TIME = 14 * 24 * 60 * 60  # 14 days by default

r = redis.StrictRedis(host=settings.REDIS_HOST,
                      port=settings.REDIS_PORT,
                      db=settings.REDIS_DB)


class UserEntry(object):
    def __init__(self, user_id):
        self.id = user_id
        # dynamically creates string for each user that will be later used
        # as a key for hash in our Redis storage
        self.user_key = '{}:{}'.format(KEY_DATA, user_id)
        self.create_or_update()

    def create_or_update(self, data=None):
        """
        Set up new user entry.
        :return: None
        """
        if self.exist():
            # data exist for user with user_id - update it
            r.hmset(self.user_key, data)
        else:
            # this is a new user - create new entry for this user
            self.add_user()
            eta = dt.datetime.utcfromtimestamp(self.get_score())
            task_id = notify_user.apply_async((self.id,), eta=eta).id
            r.hmset(self.user_key, {TASK_ID: task_id})

    def add_user(self):
        """
        Appends user's ID to the end of the queue.

        :return: None
        """
        if self.get_index():
            # if user entry exits simulate NX option of zadd command - 
            # Don't update already existing elements. Always add new elements.
            return

        # use UTC timestamp as score
        utc_time = dt.datetime.utcnow()
        score = int(utc_time.timestamp()) + WAITING_TIME

        r.zadd(KEY_QUEUE, score, self.id)

    def get_score(self):
        """
        Gets user's score (current ETA).

        :return: timestamp representing value of user's ETA
        """
        return r.zscore(KEY_QUEUE, self.id)

    def get_index(self):
        """
        Gets user's position in the queue.

        :return: 0-based index value representing user's position in the queue
        """
        return r.zrank(KEY_QUEUE, self.id)

    def get_task_id(self):
        """
        Helper method to get task ID for the user
        :return: value of user task's ID
        """
        return r.hget(self.user_key, TASK_ID).decode('ascii')

    def set_score(self, score_delta):
        """
        Move user up in the queue by score value.

        :param score_delta: number of seconds by which user's 
            score (curernt ETA) will be decremented
        :return: timestamp representing user's new score (ETA)
        """
        r.zincrby(KEY_QUEUE, self.id, score_delta)

    def exist(self):
        """
        Helper method used to define whether user exists in queue
        :return: dict of the hash’s name/value pairs if data entry exist
        """
        return r.hgetall(self.user_key)

    def bump(self):
        """
        Move user up in the queue
        :return: None
        """
        if not self.exist():
            return

        # remove current task associated with the user
        AsyncResult(self.get_task_id()).revoke()

        # we need to decrement ETA, thus *(-1)
        # here I make time_delta equal to 1 day or 1 * 24 * 60 * 60 seconds
        time_delta = WAITING_TIME / 14 * -1
        self.set_score(time_delta)
        new_eta = dt.datetime.utcfromtimestamp(time_delta)
        task_id = notify_user.apply_async((self.id,), eta=new_eta).id
        self.create_or_update({TASK_ID: task_id})

#tasks.py
import datetime
import logging

from celery import shared_task


@shared_task
def notify_user(user_id):
    logging.info('Task executed @ {}'.format(datetime.datetime.utcnow()))
    loging.info('UserID: {}'.format(user_id))
    # This task is executed when user reaches the Top of the queue.
    # Send email, perform other stuff in here ...


#models.py
from django.db.models.signals import post_save
from django.dispatch import receiver

from .utils import UserEntry


@receiver(post_save, sender=MyUser)
def create_user_entry_in_waiting_list(sender, instance=None, created=False, **kwargs):
    if created:
        # create user entry in the waiting_list
        user_id = instance.id
        UserEntry(user_id)
like image 143
an0o0nym Avatar answered Oct 20 '22 07:10

an0o0nym