I have a luigi python task which includes some pyspark libs. Now I would like to submit this task on mesos with spark-submit. What should I do to run it? Below is my code skeleton:
from pyspark.sql import functions as F
from pyspark import SparkContext
class myClass(SparkSubmitTask):
# date = luigi.DateParameter()
def __init__(self, date):
self.date = date # date is datetime.date.today().isoformat()
def output(self):
def input(self):
def run(self):
# Some functions are using pyspark libs
if __name__ == "__main__":
luigi.run()
Without luigi, I'm submmitting this task as the following command-line:
/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py
Now the problem is how I can spark-submit the luigi task that includes luigi command-line such as:
luigi --module my_module myClass --local-scheduler --date 2016-01
One more question is if my_module.py has a required task to finish first, do I need to do something more for it or just set the same as the current command-line?
I really appreciate for any hints or suggestions for this. Thanks very much.
Luigi has some template Tasks. One of them called PySparkTask. You can inherit from this class and override the properties:
https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.
I haven't tested it but based on my experience with luigi I would have try this:
import my_module
class MyPySparkTask(PySparkTask):
date = luigi.DateParameter()
@property
def name(self):
return self.__class__.__name__
@property
def master(self):
return 'mesos://host:port'
@property
def deploy_mode(self):
return 'cluster'
@property
def total_executor_cores(self):
return 1
@property
def driver_cores(self):
return 1
@property
def executor-memory(self):
return 1G
@property
def driver-memory(self):
return 1G
def main(self, sc, *args):
my_module.run(sc)
def self.app_options():
return [date]
Then you can run it with: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01
There is also an option to set the properties in a client.cfg file in order to make them the default values for other PySparkTasks:
[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With