Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Luigi - Overriding Task requires/input

I am using luigi to execute a chain of tasks, like so:

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('test.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()

    def requires(self):
        return Task1(stuff=self.stuff)

    def output(self):
        return luigi.LocalTarget('something-else.json')

    def run(self):
        with self.output().open('w') as f:
            f.write(stuff)

This works exactly as desired when I start the entire workflow like so:

luigi.build([Task2(stuff='stuff')])

When using luigi.build you can also run multiple tasks by explicitly passing arguments, as per this example in the documentation.

However, in my situation, I would also like to be able to run the business logic of Task2 completely independently of it's involvement in the workflow. This works fine for tasks that do not implement requires, as per this example.

My question is, how can I run this method both as part of the workflow, as well as on it's own? Obviously, I could just add a new private method like _my_custom_run, which takes the data and returns the result, and then use this method in run, but it just feels like something that should be baked into the framework, so it makes me feel like I am misunderstanding Luigi's best practices (still learning the framework). Any advice is appreciated, thanks!

like image 535
MrName Avatar asked Nov 07 '22 07:11

MrName


1 Answers

It sounds like you want dynamic requirements. Using the pattern shown in that example, you could read a config or pass a parameter with arbitrary data, and yield only the tasks that you want to require based on the fields in the config.

# tasks.py
import luigi
import json
import time


class Parameterizer(luigi.Task):
    params = luigi.Parameter() # Arbitrary JSON

    def output(self):
        return luigi.LocalTarget('./config.json')

    def run(self):
        with self.output().open('w') as f:
            json.dump(params, f)

class Task1(luigi.Task):
    stuff = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[:6]))

    def run(self):
        with self.output().open('w') as f:
            f.write(self.stuff)


class Task2(luigi.Task):
    stuff = luigi.Parameter()
    params = luigi.Parameter()


    def output(self):
        return luigi.LocalTarget('{}'.format(self.stuff[6:]))

    def run(self):

        config = Parameterizer(params=self.params)
        yield config

        with config.output().open() as f:
            parameters = json.load(f)

        if parameters["runTask1"]:
            yield Task1(stuff=self.stuff)
        else:
            pass
        with self.output().open('w') as f:
            f.write(self.stuff)

if __name__ == '__main__':
    cf_json = '{"runTask1": True}'

    print("Trying to run with Task1...")
    luigi.build([Task2(stuff="Task 1Task 2", params='{"runTask1":true}')], local_scheduler=True)

    time.sleep(10)

    cf_json = '{"runTask1": False}'

    print("Trying to run WITHOUT Task1...")
    luigi.build([Task2(stuff="Task 1Did just task 2", params='{"runTask1":false}')], local_scheduler=True)

(This is executed by simply calling python tasks.py)

We could easily imagine mapping more than one parameter to more than one task, or applying custom tests before allowing various tasks to execute. We could also rewrite this to take the params from luigi.Config.

Also note the following control flow from Task2:

    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        pass

Here we could run an alternative task, or dynamically call tasks as we saw in the example from the luigi repo. For example:

    if parameters["runTask1"]:
        yield Task1(stuff=self.stuff)
    else:
        # self.stuff is not automatically parsed to int, so this list comp is valid
        data_dependent_deps = [Task1(stuff=x) for x in self.stuff] 
        yield data_dependent_deps

This may be a bit more involved than a simple run_standalone() method, but I think it's the closest thing to what you were looking for in the documented luigi patterns.

Source: https://luigi.readthedocs.io/en/stable/tasks.html?highlight=dynamic#dynamic-dependencies

like image 155
Charles Landau Avatar answered Nov 15 '22 13:11

Charles Landau