Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to continously update target file using Luigi?

Tags:

python

luigi

I have recently started playing around with Luigi, and I would like to find out how to use it to continuously append new data into an existing target file.

Imagine I am pinging an api every minute to retrieve new data. Because a Task only runs if the Target is not already present, a naive approach would be to parameterize the output file by the current datetime. Here's a bare bones example:

import luigi
import datetime

class data_download(luigi.Task):
    date = luigi.DateParameter(default = datetime.datetime.now()) 

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("data_test_%s.json" % self.date.strftime("%Y-%m-%d_%H:%M"))

    def run(self):
        data = download_data()
        with self.output().open('w') as out_file:
            out_file.write(data + '\n')

if __name__ == '__main__':
    luigi.run()

If I schedule this task to run every minute, it will execute because the target file of the current time does not exist yet. But it creates 60 files a minute. What I'd like to do instead, is make sure that all the new data ends up in the same file eventually. What would be a scalable approach to accomplish that? Any ideas, suggestions are welcome!

like image 398
mtoto Avatar asked Nov 08 '22 00:11

mtoto


1 Answers

You cannot. As the doc for LocalTarget says:

Parameters: mode (str) – the mode r opens the FileSystemTarget in read-only mode, whereas w will open the FileSystemTarget in write mode. Subclasses can implement additional options.

I.e. only r or w modes are allowed. Additional options such as a require an extension of the LocalTarget class; despite it breaks the desired idempotency on Luigi task executions.

like image 138
frb Avatar answered Nov 14 '22 23:11

frb