TLDR;
To run an initialization function for each process that is spawned by celery, you can use the worker_process_init
signal. As you can read in the docs, handlers for that signal should not be blocking for more than 4 seconds.
But what are the options, if I have to run an init function that takes more than 4 seconds to execute?
Problem
I use a C extension module to run certain operations within celery tasks. This module requires an initialization that might take several seconds (maybe 4 - 10). Since I would rather prefer not to run this init function for every task but for every process that is spawned, I made use of the worker_process_init
signal:
#lib.py
import isclient #c extension module
client = None
def init():
global client
client = isclient.Client() #this might take a while
def create_ne_list(text):
return client.ne_receiventities4datachunk(text)
#celery.py
from celery import Celery
from celery.signals import worker_process_init
from lib import init
celery = Celery(include=[
'isc.ne.tasks'
])
celery.config_from_object('celeryconfig')
@worker_process_init.connect
def process_init(sender=None, conf=None, **kwargs):
init()
if __name__ == '__main__':
celery.start()
#tasks.py
from celery import celery
from lib import create_ne_list as cnl
@celery.task(time_limit=1200)
def create_ne_list(text):
return cnl(text)
What happens, when I run this code is what I described in my earlier question (Celery: stuck in infinitly repeating timeouts (Timed out waiting for UP message)). In short: since my init function takes longer than 4 seconds, it sometimes happens that a worker gets killed and restarted and during the restarting process gets killed again, because that's what automatically happens after 4 seconds unresponsiveness. This eventually results in an infinite repeating kill-and-restart process.
Another option would be to run my init function only once for every worker, using the signal worker_init
. If I do that, I get a different problem: Now the queued up processes get stuck for some reason.
When I start the worker with a concurrency of 3, and then send a couple of tasks, the first three will get finished, the remaining ones won't get touched. (I assume it might have something to do with the fact, that the client
objects needs to be shared between multiple processes and that the C extension, for some reasons, doesn't support that. But to be honest, I'm relatively new to muli-processing, so I can just guess)
Question
So, the question remains: How can I run an init function per process that takes longer than 4 seconds? Is there a correct way to do that and what way would that be?
Celery limits to process init timeout to 4.0 sec. Check source code
To workaround this limit, you can consider change it before you create celery app
from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 10.0 #set this long enough
Note that there is no configuration or setting to change this value.
@changhwan's answer is no longer the only method as of celery 4.4.0. Here is the pull request that added the config option for this feature.
With celery ^4.4.0
, this value is configurable. Use the celery application config option worker_proc_alive_timeout
. From the stable version docs:
worker_proc_alive_timeout
Default: 4.0.
The timeout in seconds (int/float) when waiting for a new worker process to start up.
from celery import Celery
from celery.signals import worker_process_init
app = Celery('app')
app.conf.worker_proc_alive_timeout = 10
@worker_process_init.connect
def long_init_function(*args, **kwargs):
import time
time.sleep(8)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With