Am new to spark and airflow, trying to understand how I can use airflow to kick off a job along with parameters needed for the job. I use the below spark-submit command to run a specific job for specific dates in edge node as below,
EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x
/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY
So can you please let me know if I create .py something similar to the code below to run the job in airflow ? Is this how your supposed to run a job & pass parameters ?
How do I pass parameter like I did for launching a job in edge node ?
If I automate the job to run daily I would like the start date to be "t-7", so if today's date is 4/20/2018 the start date to the job has to be 4/13/2018. How do I achieve it ?
###############.py file example ##############
**********************************************
import BashOperator
import os
import sys
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
import os
import sys
os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
and add operator:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class test.core.Driver abc.jar',
params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
dag=dag
)
################### EOF ######################
**********************************************
Is it possible to pass certain start & end date manually for the job to run?
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)
_config = {
'config' : '/a/b/c/d/prod.config'
'master' : 'yarn'
'deploy-mode' : 'client'
'sparkMaster' : 'yarnclient'
'class' : 'core.Driver'
'driver_classpath': 'parquet.jar',
'jars': '/a/b/c/d/test.jar',
'total_executor_cores': 4,
'executor_cores': 1,
'EXECUTORS_MEM': '8G',
'EXECUTORS_NUM': 500,
'executor-cores' : '1',
'driver-memory' : '8G',
'JOB_NAME' : ' ',
'QUEUE' : ' ',
'verbose' : ' '
'start_date' : ' '
'end_date' : ' '
]
}
operator = SparkSubmitOperator(
task_id='spark_submit_job',
dag=dag,
**_config
)
On the Spark page you can download the tgz file and unzip it on the machine that hosts Airflow. Put in the file . bashrc the SPARK_HOME and add it to the system PATH. Finally you must add the pyspark package to the environment where Airflow runs.
The steps involved in scheduling Spark Airflow Jobs are as follows: Scheduling Spark Airflow Jobs: Business Logic. Scheduling Spark Airflow Jobs: Diving into Airflow. Scheduling Spark Airflow Jobs: Building the DAG.
The start date is something you set once, and it's intended to be set absolutely, not relative to the current day.
Like this:
from airflow import DAG
dag = DAG(
...
start_date=datetime.datetime(2018, 4, 13),
)
It's possible to set start date as a delta like datetime.timedelta(days=7)
, but this is not recommended, since it would change the start date if you were to delete the DAG (including all references such as DAG runs, task instances, etc) and run it again from scratch on another day. The best practice is for DAGs to be idempotent.
For submitting a job to Spark, there is a SparkSubmitOperator
that wraps the spark-submit
shell command. That would be the preferred option. That said, you can do basically anything with a BashOperator
, so that's a workable alternative too.
The linked code for SparkSubmitOperator is well documented for each argument it accepts. You can point to your .jar file with the application
kwarg, pass Spark config with conf
. There are also kwargs for passing info like executor cores and memory. You can use application_args
to pass a list of arbitrary args through to your Spark application.
Here's an example of using SparkSubmitOperator
copied and slightly simplified from the unit tests for it in Airflow. Note that it uses **
to explode the kwargs from a dict to initialize the Spark operator, but that's just how the test is structured. You could just as easily pass each config value as a kwarg.
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)
_config = {
'conf': {
'parquet.compression': 'SNAPPY'
},
'files': 'hive-site.xml',
'py_files': 'sample_library.py',
'driver_classpath': 'parquet.jar',
'jars': 'parquet.jar',
'packages': 'com.databricks:spark-avro_2.11:3.2.0',
'exclude_packages': 'org.bad.dependency:1.0.0',
'repositories': 'http://myrepo.org',
'total_executor_cores': 4,
'executor_cores': 4,
'executor_memory': '22g',
'keytab': 'privileged_user.keytab',
'principal': 'user/[email protected]',
'name': '{{ task_instance.task_id }}',
'num_executors': 10,
'verbose': True,
'application': 'test_application.py',
'driver_memory': '3g',
'java_class': 'com.foo.bar.AppMain',
'application_args': [
'-f', 'foo',
'--bar', 'bar',
'--start', '{{ macros.ds_add(ds, -1)}}',
'--end', '{{ ds }}',
'--with-spaces', 'args should keep embdedded spaces',
]
}
operator = SparkSubmitOperator(
task_id='spark_submit_job',
dag=dag,
**_config
)
Source: https://github.com/apache/incubator-airflow/blob/f520990fe0b7a70f80bec68cb5c3f0d41e3e984d/tests/contrib/operators/test_spark_submit_operator.py
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With