Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Worker sleep not working correctly

I have the next problem, I'm using a process on Python that must wait X number of second, the process by itself work correctly, the problem is when I put it as task on celery.

When the worker try to do the time.sleep(X) on one task it pause all the tasks in the worker, for example:

I have the Worker A, it can do 4 tasks at the same time (q,w,e and r), the task r have a sleep of 1800 seconds, so the worker is doing the 4 tasks at the same time, but when the r task do the sleep the worker stop q, w and e too.

Is this normal? Do you know how I can solve this problem?

EDIT: this is an example of celery.py with my beat and queues

app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(
    Queue('search', routing_key='search.#'),
    Queue('tests', routing_key='tests.#'),
    Queue('default',    routing_key='tasks.#'),
),

CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='tasks.default',
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_TASK_SOFT_TIME_LIMIT=1800,
CELERY_ROUTES={
    'tests.tasks.volume': {
        'queue': 'tests',
        'routing_key': 'tests.volume',
    },
    'tests.tasks.summary': {
        'queue': 'tests',
        'routing_key': 'tests.summary',
    },
    'search.tasks.links': {
        'queue': 'search',
        'routing_key': 'search.links',
    },
    'search.tasks.urls': {
        'queue': 'search',
        'routing_key': 'search.urls',
    },
},

CELERYBEAT_SCHEDULE={
    # heavy one
    'each-hour-summary': {
        'task': 'tests.tasks.summary',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'each-hour-volume': {
        'task': 'tests.tasks.volume',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'links-each-cuarter': {
        'task': 'search.tasks.links',
        'schedule': crontab(minute='*/15'),
        'args': (),
    },
    'urls-each-ten': {
        'schedule': crontab(minute='*/10'),
        'task': 'search.tasks.urls',
        'args': (),
    },
}
)

test.tasks.py

@app.task
def summary():
    execute_sumary() #heavy task ~ 1 hour aprox

@app.task
def volume():
    execute_volume() #no important ~ less than 5 minutes

and search.tasks.py

@app.task
def links():
    free = search_links() #return boolean
    if free:
        process_links()
    else:
        time.sleep(1080) #<--------sleep with which I have problems
    process_links()

@app.task
def urls():
    execute_urls() #no important ~ less than 1 minute

Well, I have 2 workers, A for the queue search and B for tests and defaul.

The problem is with A, when it take the task "links" and it execute the time.sleep() it stop the other tasks that the worker is doing.

Because the worker B is working correctly I thinks the problem is the time.sleep() function.

like image 702
Luis Hernandez Donadeu Avatar asked Jan 19 '15 12:01

Luis Hernandez Donadeu


1 Answers

If you only have one process/thread, call to sleep() will block it. This means that no other task will run...

like image 151
DejanLekic Avatar answered Oct 02 '22 16:10

DejanLekic