I've been trying to integrate Luigi as our workflow handler. Currently we are using concourse, however many of the things we're trying to do is a hassle to get around in concourse so we made the switch to Luigi as our dependency manager. No problems so far, workflows trigger and execute properly.
The issue comes in when a task fails for whatever reason. This case specifically the requires block of a task, however all cases need to be taken care of. As of right now Luigi gracefully takes care of the error and writes it to STDOUT. It still emits and exit code 0 though, which to concourse means the job passed. A false positive.
I've been trying to get the event handling to fix this, but I cannot get it to trigger, even with an extremely simple job:
@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
with open('/root/luigi', 'a') as f:
f.write("we got the exception!") #testing in concourse image
sys.exit(luigi.retcodes.retcode().unhandled_exception)
class Test(luigi.Task):
def requires(self):
raise Exception()
return []
def run(self):
pass
def output(self):
return []
Then running the command in python shell
luigi.run(main_task_cls=Test, local_scheduler=True)
The exception gets raised, but the even doesn't fire or something. The file doesn't get written and the exit code is still 0.
Also, if it makes a difference I have my luigi config at /etc/luigi/client.cfg which contains
[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40
I'm at a loss as to why the event handler won't trigger, but somehow I need the process to fail on an error.
A Luigi Task describes a unit or work. The key methods of a Task, which must be implemented in a subclass are: * :py:meth:`run` - the computation done by this task. * :py:meth:`requires` - the list of Tasks that this Task depends on. * :py:meth:`output` - the output :py:class:`Target` that this Task creates.
worker module. The worker communicates with the scheduler and does two things: Sends all tasks that has to be run. Gets tasks from the scheduler that should be run.
Run pip install luigi to install the latest stable version from PyPI. Documentation for the latest release is hosted on readthedocs. Run pip install luigi[toml] to install Luigi with TOML-based configs support. For the bleeding edge code, pip install git+https://github.com/spotify/luigi.git .
It seems like the problem is where you place the "raise Exception" call. If you place it in the requires function - it basically runs before your Test task run method. So it's not as if your Test task failed, but the task it's dependent on (right now, empty...).
for example if you move the raise to run, you're code will behave as you expect.
def run(self):
print('start')
raise Exception()
To handle a case where your dependency fails (in this case, the exception is raised in the requires method), you can add another type of luigi event handler, BROKEN_TASK: luigi.Event.BROKEN_TASK. This will make sure the luigi code emits the return code (different than 0) you expect.
Cheers!
If you'd like to catch exceptions in requires()
, use the following:
@luigi.Task.event_handler(luigi.Event.BROKEN_TASK)
def mourn_failure(task, exception):
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With