I am working on a Luigi pipeline that checks if a manually created file exists and if so, continues with the next tasks:
import luigi, os
class ExternalFileChecker(luigi.ExternalTask):
task_namespace='MyTask'
path = luigi.Parameter()
def output(self):
return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))
class ProcessExternalFile(luigi.Task):
task_namespace='MyTask'
path = luigi.Parameter()
def requires(self):
return ExternalFileChecker(path=self.path)
def output(self):
dirname = self.path
outfile = os.path.join(dirname, 'processedfile.txt')
return luigi.LocalTarget(outfile)
def run(self):
#do processing
if __name__ == '__main__':
path = r'D:\MyPath\luigi'
luigi.run(['MyTask.ProcessExternalFile','--path', path,\
'--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
'--worker-keep-alive'])
What I want is that luigi continues after I have created the manual file and pasted it in the path. When I do this, instead of finding the file and continuing with the task, it rechecks for a new task every few seconds:
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)
After a considerable amount of time (15-20 minutes or so), luigi will find the file and then it is able to continue as desired. What can I do to prevent this delay? I want luigi to continue as soon as the file exists.
A few things to keep in mind:
keep_alive = True
, in which case it will exit when there are no more pending tasks).retry_external_tasks
config setting in the [worker]
section.I think what you are observing is something like this. Your pipeline is running, the task ProcessExternalFile
fails, then you add the file, the task remains FAILED for the duration of retry_delay
, then finally it becomes PENDING and the worker is given this task again, at which point it discovers the file and the task becomes COMPLETE.
Whether this is desired behavior is up to you. If you want the file to be found quicker, you can change retry interval. Or you can do an infinite while
loop within the run
method and check for the file periodically, and break out of the loop when found. You can also configure Luigi to disable retry logic altogether.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With