Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Luigi - Continue with External task when satisfied

Tags:

python

luigi

I am working on a Luigi pipeline that checks if a manually created file exists and if so, continues with the next tasks:

import luigi, os

class ExternalFileChecker(luigi.ExternalTask):
    task_namespace='MyTask'
    path = luigi.Parameter()
    def output(self):
        return luigi.LocalTarget(os.path.join(self.path, 'externalfile.txt'))

 class ProcessExternalFile(luigi.Task):
      task_namespace='MyTask'
      path = luigi.Parameter()

      def requires(self):
          return ExternalFileChecker(path=self.path)

      def output(self):
          dirname = self.path
          outfile = os.path.join(dirname, 'processedfile.txt')
          return luigi.LocalTarget(outfile)

      def run(self):
          #do processing

if __name__ == '__main__':
      path = r'D:\MyPath\luigi'
      luigi.run(['MyTask.ProcessExternalFile','--path', path,\
      '--worker-retry-external-tasks','--scheduler-retry-delay', '20',\
      '--worker-keep-alive'])

What I want is that luigi continues after I have created the manual file and pasted it in the path. When I do this, instead of finding the file and continuing with the task, it rechecks for a new task every few seconds:

DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 1.536391 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: Sleeping for 5.669132 seconds
DEBUG: Asking scheduler for work...
DEBUG: Done
(...)

After a considerable amount of time (15-20 minutes or so), luigi will find the file and then it is able to continue as desired. What can I do to prevent this delay? I want luigi to continue as soon as the file exists.

like image 681
Johan Avatar asked Jun 23 '16 10:06

Johan


Video Answer


1 Answers

A few things to keep in mind:

  1. Luigi worker thread will not exit until there is at least one task running (or if keep_alive = True, in which case it will exit when there are no more pending tasks).
  2. There is retry logic for failed tasks, with default retry interval being 15 minutes.
  3. Retry logic works as follows. After specified retry interval, the scheduler will forget the task's failure (identical to clicking "forgive failures" button in the UI), and change the task's status to pending. The next time the worker asks the scheduler for work, this task can be assigned to the worker.
  4. Incomplete external tasks count as FAILED, subject to retry logic.
  5. Retry behavior for external tasks is controlled by retry_external_tasks config setting in the [worker] section.

I think what you are observing is something like this. Your pipeline is running, the task ProcessExternalFile fails, then you add the file, the task remains FAILED for the duration of retry_delay, then finally it becomes PENDING and the worker is given this task again, at which point it discovers the file and the task becomes COMPLETE.

Whether this is desired behavior is up to you. If you want the file to be found quicker, you can change retry interval. Or you can do an infinite while loop within the run method and check for the file periodically, and break out of the loop when found. You can also configure Luigi to disable retry logic altogether.

like image 158
user443854 Avatar answered Nov 14 '22 22:11

user443854