Hello people of the Earth!
I'm using Airflow to schedule and run Spark tasks.
All I found by this time is python DAGs that Airflow can manage.
DAG example:
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
The problem is I'm not good in Python code and have some tasks written in Java. My question is how to run Spark Java jar in python DAG? Or maybe there is other way yo do it? I found spark submit: http://spark.apache.org/docs/latest/submitting-applications.html
But I don't know how to connect everything together. Maybe someone used it before and has working example. Thank you for your time!
Click Schedule Application. Click Next. In the Schedule Spark Application dialog, write the spark-submit command, much as you would to submit applications from the spark-submit command line. Select the Spark instance group to which you want to submit the Spark batch application.
You should be able to use BashOperator
. Keeping the rest of your code as is, import required class and system packages:
from airflow.operators.bash_operator import BashOperator
import os
import sys
set required paths:
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
and add operator:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
dag=dag
)
You can easily extend this to provide additional arguments using Jinja templates.
You can of course adjust this for non-Spark scenario by replacing bash_command
with a template suitable in your case, for example:
bash_command = 'java -jar {{ params.jar }}'
and adjusting params
.
Airflow as of version 1.8 (released today), has
SparkSQLHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py
SparkSubmitHook code - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py
Notice that these two new Spark operators/hooks are in "contrib" branch as of 1.8 version so not (well) documented.
So you can use SparkSubmitOperator to submit your java code for Spark execution.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With