Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Workers dying early due to uneven work distribution in Luigi (2.6.1)

We are trying to run a simple pipeline distributed on a docker swarm cluster. The luigi workers are deployed as replicated docker services. They start successfully and after a few seconds of asking for work to luigi-server, they begin to die due to no work was assigned to them and all tasks end up assigned to a single worker.

We had to set keep_alive=True in luigi.cfg of our workers to force them not to die, but keeping workers around after the pipeline is done seems to be a bad idea. Is there a way to control the work distribution ?

Our test pipeline:

class RunAllTasks(luigi.Task):

    tasks = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    def requires(self):
        for i in range(self.tasks):
            yield RunExampleTask(i, self.sleep_time)

    def run(self):
        with self.output().open('w') as f:
            f.write('All done!')

    def output(self):
        return LocalTarget('/data/RunAllTasks.txt')


class RunExampleTask(luigi.Task):

    number = luigi.IntParameter()
    sleep_time = luigi.IntParameter()

    @property
    def cmd(self):
        return """
               docker run --rm --name example_{number} hello-world
           """.format(number=self.number)

    def run(self):
        time.sleep(self.sleep_time)
        logger.debug(self.cmd)
        out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
        logger.debug(out)
        with self.output().open('w') as f:
            f.write(str(out))

    def output(self):
        return LocalTarget('/data/{number}.txt'.format(number=self.number))


if __name__ == "__main__":
    luigi.run()
like image 997
fcisneros Avatar asked May 08 '17 15:05

fcisneros


1 Answers

Your issue is the result of yielding a single requirement at a time, instead you want to yield all of them at once, as follows:

def requires(self):
    reqs = []
    for i in range(self.tasks):
        reqs.append(RunExampleTask(i, self.sleep_time))
    yield reqs
like image 148
Maksim Avatar answered Oct 20 '22 22:10

Maksim