Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run a luigi task with spark-submit and pyspark

I have a luigi python task which includes some pyspark libs. Now I would like to submit this task on mesos with spark-submit. What should I do to run it? Below is my code skeleton:

from pyspark.sql import functions as F
from pyspark import SparkContext

class myClass(SparkSubmitTask):
# date = luigi.DateParameter()

  def __init__(self, date):
    self.date = date # date is datetime.date.today().isoformat()

  def output(self):

  def input(self):

  def run(self):
    # Some functions are using pyspark libs

if __name__ == "__main__":
  luigi.run()

Without luigi, I'm submmitting this task as the following command-line:

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py

Now the problem is how I can spark-submit the luigi task that includes luigi command-line such as:

luigi --module my_module myClass --local-scheduler --date 2016-01

One more question is if my_module.py has a required task to finish first, do I need to do something more for it or just set the same as the current command-line?

I really appreciate for any hints or suggestions for this. Thanks very much.

like image 449
zuhakasa Avatar asked Mar 11 '23 06:03

zuhakasa


1 Answers

Luigi has some template Tasks. One of them called PySparkTask. You can inherit from this class and override the properties:

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

I haven't tested it but based on my experience with luigi I would have try this:

import my_module


class MyPySparkTask(PySparkTask):
    date = luigi.DateParameter()

    @property
    def name(self):
        return self.__class__.__name__

    @property
    def master(self):
        return 'mesos://host:port'

    @property
    def deploy_mode(self):
        return 'cluster'

    @property
    def total_executor_cores(self):
        return 1

    @property
    def driver_cores(self):
        return 1

    @property
    def executor-memory(self):
        return 1G

    @property
    def driver-memory(self):
        return 1G

    def main(self, sc, *args):
        my_module.run(sc)

    def self.app_options():
        return [date]

Then you can run it with: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

There is also an option to set the properties in a client.cfg file in order to make them the default values for other PySparkTasks:

[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G
like image 97
ayun12 Avatar answered Mar 13 '23 20:03

ayun12