I'm trying to grasp how luigi works, and I get the idea, but actual implementation is a bit harder ;) This is what i have:
class MyTask(luigi.Task):
x = luigi.IntParameter()
def requires(self):
return OtherTask(self.x)
def run(self):
print(self.x)
class OtherTask(luigi.Task):
x = luigi.IntParameter()
def run(self):
y = self.x + 1
print(y)
And this fails with RuntimeError: Unfulfilled dependency at run time: OtherTask_3_5862334ee2
. I've figured that I need to produce output using def output(self):
to workaround this issue\feature. And I can't comprehend how do I produce reasonable output without writing to a file, say:
def output(self):
return luigi.LocalTarget('words.txt')
def run(self):
words = [
'apple',
'banana',
'grapefruit'
]
with self.output().open('w') as f:
for word in words:
f.write('{word}\n'.format(word=word))
I've tried reading the documentation, but I can't understand the concept behind output at all. What if I need to output to screen only. What if I need to output an object to another task? Thanks!
By default, Luigi tasks run using the Luigi scheduler. To run one of your previous tasks using the Luigi scheduler omit the --local-scheduler argument from the command. Re-run the task from Step 3 using the following command: python -m luigi --module word-frequency GetTopBooks.
A Luigi Task describes a unit or work. The key methods of a Task, which must be implemented in a subclass are: * :py:meth:`run` - the computation done by this task. * :py:meth:`requires` - the list of Tasks that this Task depends on. * :py:meth:`output` - the output :py:class:`Target` that this Task creates.
GitHub - spotify/luigi: Luigi is a Python module that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization etc. It also comes with Hadoop support built in. Skip to content Toggle navigation.
What if I need to output an object to another task?
Luigi tasks can run in different processes. Therefore you do usually have to write to disk, a database, pickle, or some external mechanism that allows data to be exchanged between the processes (and the existence of which can be verified) if you want to exchange an object that is the result of a task.
As opposed to writing the output() method, which requires a target, you can also override the complete() method where you can write any custom logic that allows the tasks to be considered complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With