Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery periodic_task running multiple times in parallel

Tags:

python

celery

I have some very simple periodic code using Celery's threading; it simply prints "Pre" and "Post" and sleep in between. It is adapted from this StackOverflow question and this linked website

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

This code never prints 'already in use...'; the same phenomenon occurs when I use the @single_instance_task decorator.

Do you know what's wrong?

Edit: I've simplified the question so that it doesn't write to memory (using a global or the django cache); I still never see 'already in use...'


Edit: When I add the following code to my Django settings.py file (by changing the code from https://docs.djangoproject.com/en/dev/topics/cache/ everything works as hoped, but only when I use port 11211 (oddly enough, my server is on port 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}
like image 277
user Avatar asked Feb 02 '26 20:02

user


1 Answers

How are you running celeryd? I'm not familiar with a threaded option.

If it's running multi-process then there are no "global" variables that are shared memory between workers.

If you want a counter shared between all workers, then I'd suggest you use cache.incr.

E.g.:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

Update

What happens if you force your tasks to overlap by sleeping, e.g.:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

If you don't get "already in use..." from running this more frequently then 20 seconds then you know that the cache isn't behaving as expected.


Another Update

Have you set up a cache backend in your django settings? E.g. memcached

If not you may be using the Dummy Cache, which doesn't actually do any caching, just implements the interface... which is sounding like a convincing cause of your problem.

like image 140
MattH Avatar answered Feb 04 '26 10:02

MattH



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!