Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Event Handling in Python Luigi

I've been trying to integrate Luigi as our workflow handler. Currently we are using concourse, however many of the things we're trying to do is a hassle to get around in concourse so we made the switch to Luigi as our dependency manager. No problems so far, workflows trigger and execute properly.

The issue comes in when a task fails for whatever reason. This case specifically the requires block of a task, however all cases need to be taken care of. As of right now Luigi gracefully takes care of the error and writes it to STDOUT. It still emits and exit code 0 though, which to concourse means the job passed. A false positive.

I've been trying to get the event handling to fix this, but I cannot get it to trigger, even with an extremely simple job:

@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    with open('/root/luigi', 'a') as f:
        f.write("we got the exception!") #testing in concourse image
    sys.exit(luigi.retcodes.retcode().unhandled_exception)

class Test(luigi.Task):
    def requires(self):
        raise Exception()
        return []

    def run(self):
        pass

    def output(self):
        return []

Then running the command in python shell

luigi.run(main_task_cls=Test, local_scheduler=True)

The exception gets raised, but the even doesn't fire or something. The file doesn't get written and the exit code is still 0.

Also, if it makes a difference I have my luigi config at /etc/luigi/client.cfg which contains

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

I'm at a loss as to why the event handler won't trigger, but somehow I need the process to fail on an error.

like image 844
Cameron Otts Avatar asked Feb 28 '17 19:02

Cameron Otts


People also ask

What is a Luigi task?

A Luigi Task describes a unit or work. The key methods of a Task, which must be implemented in a subclass are: * :py:meth:`run` - the computation done by this task. * :py:meth:`requires` - the list of Tasks that this Task depends on. * :py:meth:`output` - the output :py:class:`Target` that this Task creates.

What are workers in Luigi?

worker module. The worker communicates with the scheduler and does two things: Sends all tasks that has to be run. Gets tasks from the scheduler that should be run.

How do I install Luigi for Python?

Run pip install luigi to install the latest stable version from PyPI. Documentation for the latest release is hosted on readthedocs. Run pip install luigi[toml] to install Luigi with TOML-based configs support. For the bleeding edge code, pip install git+https://github.com/spotify/luigi.git .


2 Answers

It seems like the problem is where you place the "raise Exception" call. If you place it in the requires function - it basically runs before your Test task run method. So it's not as if your Test task failed, but the task it's dependent on (right now, empty...).

for example if you move the raise to run, you're code will behave as you expect.

    def run(self):
       print('start')
       raise Exception()

To handle a case where your dependency fails (in this case, the exception is raised in the requires method), you can add another type of luigi event handler, BROKEN_TASK: luigi.Event.BROKEN_TASK. This will make sure the luigi code emits the return code (different than 0) you expect.

Cheers!

like image 172
Tom Avatar answered Oct 10 '22 16:10

Tom


If you'd like to catch exceptions in requires(), use the following:

@luigi.Task.event_handler(luigi.Event.BROKEN_TASK)
def mourn_failure(task, exception):
    ...
like image 21
cangers Avatar answered Oct 10 '22 16:10

cangers