Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to reset luigi task status?

Currently, I have a bunch of luigi tasks queued together, with a simple dependency chain( a -> b -> c -> d). d gets executed first, and a at the end. a is the task that gets triggered.

All the targets except a return a luigi.LocalTarget() object and have a single generic luigi.Parameter() which is a string (containing a date and a time). Runs on a luigi central server (which has history enabled).

The problem is that, when I rerun the said task a, luigi checks the history and sees if that particular task has been run before, if it had had a status of DONE, it doesn't run the tasks (d in this case) and I can't have that, changing the string isn't helping (added a random microsecond to it). How do I force run a task ?

like image 578
HackToHell Avatar asked Jan 05 '16 13:01

HackToHell


People also ask

How do you run a Luigi task?

To execute the task you created, run the following command: python -m luigi --module hello-world HelloLuigi --local-scheduler.

What is Luigi Python?

Luigi is a Python (2.7, 3.6, 3.7 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.


2 Answers

First a comment: Luigi tasks are idempotent. if you run a task with the same parameter values, no matter how many times you run it, it must always return the same outputs. So it doesn't make sense to run it more than once. This makes Luigi powerful: if you have a big task that makes a lot of things an takes a lot of time and it fails somewhere, you'll have to run it again from the beginning. If you split it into smaller tasks, run it and it fails, you'll only have to run the rest of the tasks in the pipeline.

When you run a task Luigi checks out the outputs of that task to see if they exist. If they don't, Luigi checks out the outputs of the tasks it depends on. If they exists, then it will only run the current task and generate the output Target. If the dependencies outputs doesn't exists, then it will run that tasks.

So, if you want to rerun a task you must delete its Target outputs. And if you want to rerun the whole pipeline you must delete all the outputs of all the tasks that tasks depends on in cascade.

There's an ongoing discussion in this issue in Luigi repository. Take a look at this comment since it will point you to some scripts for getting the output targets of a given task and removing them.

like image 98
matagus Avatar answered Sep 20 '22 14:09

matagus


I typically do this by overriding complete():

class BaseTask(luigi.Task):

    force = luigi.BoolParameter()

    def complete(self):
        outputs = luigi.task.flatten(self.output())
        for output in outputs:
            if self.force and output.exists():
                output.remove()
        return all(map(lambda output: output.exists(), outputs))


class MyTask(BaseTask):
    def output(self):
        return luigi.LocalTarget("path/to/done/file.txt")

    def run(self):
        with self.output().open('w') as out_file:
            out_file.write('Complete')

When you run the task, the output file is created as expected. Upon instantiating the class with force=True, the output file will still exist until complete() is called.

task = MyTask()
task.run()
task.complete()
# True

new_task = MyTask(force=True)
new_task.output().exists()
# True
new_task.complete()
# False
new_task.output().exists()
# False
like image 41
cangers Avatar answered Sep 20 '22 14:09

cangers