I have the next problem, I'm using a process on Python that must wait X number of second, the process by itself work correctly, the problem is when I put it as task on celery.
When the worker try to do the time.sleep(X) on one task it pause all the tasks in the worker, for example:
I have the Worker A, it can do 4 tasks at the same time (q,w,e and r), the task r have a sleep of 1800 seconds, so the worker is doing the 4 tasks at the same time, but when the r task do the sleep the worker stop q, w and e too.
Is this normal? Do you know how I can solve this problem?
EDIT: this is an example of celery.py with my beat and queues
app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(
Queue('search', routing_key='search.#'),
Queue('tests', routing_key='tests.#'),
Queue('default', routing_key='tasks.#'),
),
CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='tasks.default',
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_TASK_SOFT_TIME_LIMIT=1800,
CELERY_ROUTES={
'tests.tasks.volume': {
'queue': 'tests',
'routing_key': 'tests.volume',
},
'tests.tasks.summary': {
'queue': 'tests',
'routing_key': 'tests.summary',
},
'search.tasks.links': {
'queue': 'search',
'routing_key': 'search.links',
},
'search.tasks.urls': {
'queue': 'search',
'routing_key': 'search.urls',
},
},
CELERYBEAT_SCHEDULE={
# heavy one
'each-hour-summary': {
'task': 'tests.tasks.summary',
'schedule': crontab(minute='0', hour='*/1'),
'args': (),
},
'each-hour-volume': {
'task': 'tests.tasks.volume',
'schedule': crontab(minute='0', hour='*/1'),
'args': (),
},
'links-each-cuarter': {
'task': 'search.tasks.links',
'schedule': crontab(minute='*/15'),
'args': (),
},
'urls-each-ten': {
'schedule': crontab(minute='*/10'),
'task': 'search.tasks.urls',
'args': (),
},
}
)
test.tasks.py
@app.task
def summary():
execute_sumary() #heavy task ~ 1 hour aprox
@app.task
def volume():
execute_volume() #no important ~ less than 5 minutes
and search.tasks.py
@app.task
def links():
free = search_links() #return boolean
if free:
process_links()
else:
time.sleep(1080) #<--------sleep with which I have problems
process_links()
@app.task
def urls():
execute_urls() #no important ~ less than 1 minute
Well, I have 2 workers, A for the queue search and B for tests and defaul.
The problem is with A, when it take the task "links" and it execute the time.sleep() it stop the other tasks that the worker is doing.
Because the worker B is working correctly I thinks the problem is the time.sleep() function.
If you only have one process/thread, call to sleep() will block it. This means that no other task will run...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With