Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery task schedule (Ensuring a task is only executed one at a time)

I have a task, somewhat like this:

@task()
def async_work(info):
    ...

At any moment, I may call async_work with some info. For some reason, I need to make sure that only one async_work is running at a time, other calling request must wait for.

So I come up with the following code:

is_locked = False    
@task()
def async_work(info):
    while is_locked:
        pass
    is_locked = True
    ...
    is_locked = False

But it says it's invalid to access local variables... How to solve it?

like image 231
Mike Lee Avatar asked Aug 17 '12 09:08

Mike Lee


3 Answers

It is invalid to access local variables since you can have several celery workers running tasks. And those workers might even be on different hosts. So, basically, there is as many is_locked variable instances as many Celery workers are running your async_work task. Thus, even though your code won't raise any errors you wouldn't get desired effect with it.

To achieve you goal you need to configure Celery to run only one worker. Since any worker can process a single task at any given time you get what you need.

EDIT:

According to Workers Guide > Concurrency:

By default multiprocessing is used to perform concurrent execution of tasks, but you can also use Eventlet. The number of worker processes/threads can be changed using the --concurrency argument and defaults to the number of CPUs available on the machine.

Thus you need to run the worker like this:

$ celery worker --concurrency=1

EDIT 2:

Surprisingly there's another solution, moreover it is even in the official docs, see the Ensuring a task is only executed one at a time article.

like image 195
Ihor Kaharlichenko Avatar answered Oct 27 '22 01:10

Ihor Kaharlichenko


You probably don't want to use concurrency=1 for your celery workers - you want your tasks to be processed concurrently. Instead you can use some kind of locking mechanism. Just ensure timeout for cache is bigger than time to finish your task.

Redis

import redis
from contextlib import contextmanager

redis_client = redis.Redis(host='localhost', port=6378)


@contextmanager
def redis_lock(lock_name):
    """Yield 1 if specified lock_name is not already set in redis. Otherwise returns 0.

    Enables sort of lock functionality.
    """
    status = redis_client.set(lock_name, 'lock', nx=True)
    try:
        yield status
    finally:
        redis_client.delete(lock_name)


@task()
def async_work(info):
    with redis_lock('my_lock_name') as acquired:
        do_some_work()

Memcache

Example inspired by celery documentation

from contextlib import contextmanager
from django.core.cache import cache

@contextmanager
def memcache_lock(lock_name):
    status = cache.add(lock_name, 'lock')
    try:
        yield status
    finally:
        cache.delete(lock_name)


@task()
def async_work(info):
    with memcache_lock('my_lock_name') as acquired:
        do_some_work() 
like image 44
jozo Avatar answered Oct 27 '22 01:10

jozo


I have implemented a decorator to handle this. It's based on Ensuring a task is only executed one at a time from the official Celery docs.

It uses the function's name and its args and kwargs to create a lock_id, which is set/get in Django's cache layer (I have only tested this with Memcached but it should work with Redis as well). If the lock_id is already set in the cache it will put the task back on the queue and exit.

CACHE_LOCK_EXPIRE = 30


def no_simultaneous_execution(f):
    """
    Decorator that prevents a task form being executed with the
    same *args and **kwargs more than one at a time.
    """
    @functools.wraps(f)
    def wrapper(self, *args, **kwargs):
        # Create lock_id used as cache key
        lock_id = '{}-{}-{}'.format(self.name, args, kwargs)

        # Timeout with a small diff, so we'll leave the lock delete
        # to the cache if it's close to being auto-removed/expired
        timeout_at = monotonic() + CACHE_LOCK_EXPIRE - 3

        # Try to acquire a lock, or put task back on queue
        lock_acquired = cache.add(lock_id, True, CACHE_LOCK_EXPIRE)
        if not lock_acquired:
            self.apply_async(args=args, kwargs=kwargs, countdown=3)
            return

        try:
            f(self, *args, **kwargs)
        finally:
            # Release the lock
            if monotonic() < timeout_at:
                cache.delete(lock_id)
    return wrapper

You would then apply it on any task as the first decorator:

@shared_task(bind=True, base=MyTask)
@no_simultaneous_execution
def sometask(self, some_arg):
  ...
like image 20
Andreas Bergström Avatar answered Oct 27 '22 01:10

Andreas Bergström