I am trying to set a maximum run time for my celery jobs.
I am currently recovering from exceptions with a context manager. I ended up with code very similar to this snippet:
from celery.exceptions import SoftTimeLimitExceeded
class Manager:
def __enter__(self):
return self
def __exit__(self, error_type, error, tb):
if error_type == SoftTimeLimitExceeded:
logger.info('job killed.')
# swallow the exception
return True
@task
def do_foo():
with Manager():
run_task1()
run_task2()
run_task3()
What I expected:
If do_foo
times out in run_task1
, the logger logs, the SoftTimeLimitExceeded exception is swallowed, the body of the manager is skipped, the job ends without running run_task2
and run_task3
.
What I observe:
do_foo
times out in run_task1
, SoftTimeLimitExceeded is raised, the logger logs, the SoftTimeLimitExceeded exception is swallowed but run_task2
and run_task3
are running nevertheless.
I am looking for an answer to following two questions:
Why is run_task2
still executed when SoftTimeLimitExceeded is raised in run_task1
in this setting?
Is there an easy way to transform my code so that it can performs as expected?
A context manager usually takes care of setting up some resource, e.g. opening a connection, and automatically handles the clean up when we are done with it. Probably, the most common use case is opening a file. The code above will open the file and will keep it open until we are out of the with statement.
__enter__ and [__exit__] both are methods that are invoked on entry to and exit from the body of "the with statement" (PEP 343) and implementation of both is called context manager. the with statement is intend to hiding flow control of try finally clause and make the code inscrutable.
Context managers allow you to allocate and release resources precisely when you want to. The most widely used example of context managers is the with statement. Suppose you have two related operations which you'd like to execute as a pair, with a block of code in between.
SoftTimeLimitExceeded [source] The soft time limit has been exceeded. This exception is raised to give the task a chance to clean up.
This code is pretty good; there's not much cleaning up to do.
self
from __enter__
if the context manager isn't designed to be used with the as
keyword.is
should be used when checking classes, since they are singletons...issubclass
to properly emulate exception handling.Implementing these changes gives:
from celery.exceptions import SoftTimeLimitExceeded
class Manager:
def __enter__(self):
pass
def __exit__(self, error_type, error, tb):
if issubclass(error_type, SoftTimeLimitExceeded):
logger.info('job killed.')
# swallow the exception
return True
@task
def do_foo():
with Manager():
run_task1()
run_task2()
run_task3()
I created a mock environment for debugging:
class SoftTimeLimitExceeded(Exception):
pass
class Logger:
info = print
logger = Logger()
del Logger
def task(f):
return f
def run_task1():
print("running task 1")
raise SoftTimeLimitExceeded
def run_task2():
print("running task 2")
def run_task_3():
print("running task 3")
Executing this and then your program gives:
>>> do_foo()
running task 1
job killed.
This is the expected behaviour.
I can think of two possibilities:
run_task1
, is asynchronous.celery
is doing something weird.I'll run with the second hypothesis because I can't test the former.
I've been bitten by the obscure behaviour of a combination between context managers, exceptions and coroutines before, so I know what sorts of problems it causes. This seems like one of them, but I'll have to look at celery
's code before I can go any further.
Edit: I can't make head nor tail of celery
's code, and searching hasn't turned up the code that raises SoftTimeLimitExceeded
to allow me to trace it backwards. I'll pass it on to somebody more experienced with celery
to see if they can work out how it works.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With