Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: correct way to run lengthy initialization function (per process)

TLDR;

To run an initialization function for each process that is spawned by celery, you can use the worker_process_init signal. As you can read in the docs, handlers for that signal should not be blocking for more than 4 seconds. But what are the options, if I have to run an init function that takes more than 4 seconds to execute?

Problem

I use a C extension module to run certain operations within celery tasks. This module requires an initialization that might take several seconds (maybe 4 - 10). Since I would rather prefer not to run this init function for every task but for every process that is spawned, I made use of the worker_process_init signal:

#lib.py 
import isclient #c extension module
client = None
def init():
    global client
    client = isclient.Client() #this might take a while

def create_ne_list(text):
    return client.ne_receiventities4datachunk(text)

#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init

celery = Celery(include=[
    'isc.ne.tasks'
])

celery.config_from_object('celeryconfig')

@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
    init()

if __name__ == '__main__':
    celery.start()

#tasks.py
from celery import celery
from lib import create_ne_list as cnl

@celery.task(time_limit=1200)
def create_ne_list(text):
    return cnl(text)

What happens, when I run this code is what I described in my earlier question (Celery: stuck in infinitly repeating timeouts (Timed out waiting for UP message)). In short: since my init function takes longer than 4 seconds, it sometimes happens that a worker gets killed and restarted and during the restarting process gets killed again, because that's what automatically happens after 4 seconds unresponsiveness. This eventually results in an infinite repeating kill-and-restart process.

Another option would be to run my init function only once for every worker, using the signal worker_init. If I do that, I get a different problem: Now the queued up processes get stuck for some reason. When I start the worker with a concurrency of 3, and then send a couple of tasks, the first three will get finished, the remaining ones won't get touched. (I assume it might have something to do with the fact, that the client objects needs to be shared between multiple processes and that the C extension, for some reasons, doesn't support that. But to be honest, I'm relatively new to muli-processing, so I can just guess)

Question

So, the question remains: How can I run an init function per process that takes longer than 4 seconds? Is there a correct way to do that and what way would that be?

like image 680
basilikum Avatar asked Jun 13 '14 10:06

basilikum


2 Answers

Celery limits to process init timeout to 4.0 sec. Check source code

To workaround this limit, you can consider change it before you create celery app

from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough

Note that there is no configuration or setting to change this value.

like image 85
changhwan Avatar answered Nov 01 '22 17:11

changhwan


@changhwan's answer is no longer the only method as of celery 4.4.0. Here is the pull request that added the config option for this feature.

Use the config option

With celery ^4.4.0, this value is configurable. Use the celery application config option worker_proc_alive_timeout. From the stable version docs:

worker_proc_alive_timeout

Default: 4.0.

The timeout in seconds (int/float) when waiting for a new worker process to start up.

Example:

from celery import Celery
from celery.signals import worker_process_init

app = Celery('app')
app.conf.worker_proc_alive_timeout = 10

@worker_process_init.connect
def long_init_function(*args, **kwargs):
   import time
   time.sleep(8)
like image 26
Zev Isert Avatar answered Nov 01 '22 17:11

Zev Isert