We are trying to run a simple pipeline distributed on a docker swarm cluster. The luigi workers are deployed as replicated docker services. They start successfully and after a few seconds of asking for work to luigi-server, they begin to die due to no work was assigned to them and all tasks end up assigned to a single worker.
We had to set keep_alive=True in luigi.cfg of our workers to force them not to die, but keeping workers around after the pipeline is done seems to be a bad idea. Is there a way to control the work distribution ?
Our test pipeline:
class RunAllTasks(luigi.Task):
tasks = luigi.IntParameter()
sleep_time = luigi.IntParameter()
def requires(self):
for i in range(self.tasks):
yield RunExampleTask(i, self.sleep_time)
def run(self):
with self.output().open('w') as f:
f.write('All done!')
def output(self):
return LocalTarget('/data/RunAllTasks.txt')
class RunExampleTask(luigi.Task):
number = luigi.IntParameter()
sleep_time = luigi.IntParameter()
@property
def cmd(self):
return """
docker run --rm --name example_{number} hello-world
""".format(number=self.number)
def run(self):
time.sleep(self.sleep_time)
logger.debug(self.cmd)
out = subprocess.check_output(self.cmd, stderr=subprocess.STDOUT, shell=True)
logger.debug(out)
with self.output().open('w') as f:
f.write(str(out))
def output(self):
return LocalTarget('/data/{number}.txt'.format(number=self.number))
if __name__ == "__main__":
luigi.run()
Your issue is the result of yield
ing a single requirement at a time, instead you want to yield
all of them at once, as follows:
def requires(self):
reqs = []
for i in range(self.tasks):
reqs.append(RunExampleTask(i, self.sleep_time))
yield reqs
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With