Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache airflow - automation - how to run spark submit job with param

Am new to spark and airflow, trying to understand how I can use airflow to kick off a job along with parameters needed for the job. I use the below spark-submit command to run a specific job for specific dates in edge node as below,

EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x

/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM  --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G  --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY

So can you please let me know if I create .py something similar to the code below to run the job in airflow ? Is this how your supposed to run a job & pass parameters ?

How do I pass parameter like I did for launching a job in edge node ?

If I automate the job to run daily I would like the start date to be "t-7", so if today's date is 4/20/2018 the start date to the job has to be 4/13/2018. How do I achieve it ?

###############.py file example ##############
**********************************************

    import BashOperator

    import os
    import sys

    os.environ['SPARK_HOME'] = '/path/to/spark/root'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

    import os
    import sys


    os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
    sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
    and add operator:

    spark_task = BashOperator(
        task_id='spark_java',
        bash_command='spark-submit --class test.core.Driver abc.jar',
        params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
        dag=dag

)

################### EOF ######################
**********************************************

New .py file - please correct me if anything is wrong

  • How do I pass the params to Run a spark version which in different path ?
  • Pass a jar which in different path
  • Is it the right way to pass the parameters like shown below?
  • Is it possible to pass certain start & end date manually for the job to run?

    from airflow import DAG
    
    from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
    from airflow.utils import timezone
    
    
    DEFAULT_DATE = timezone.datetime(2017, 1, 1)
    
    args = {
        'owner': 'airflow',
        'start_date': DEFAULT_DATE
    }
    dag = DAG('test_dag_id', default_args=args)
    
    _config = {
        'config' : '/a/b/c/d/prod.config' 
        'master' : 'yarn'
        'deploy-mode' : 'client'
        'sparkMaster' : 'yarnclient'
        'class' : 'core.Driver' 
        'driver_classpath': 'parquet.jar',
        'jars': '/a/b/c/d/test.jar',
        'total_executor_cores': 4,
        'executor_cores': 1,
        'EXECUTORS_MEM': '8G',
        'EXECUTORS_NUM': 500,
        'executor-cores' : '1',
        'driver-memory' : '8G',
        'JOB_NAME' : ' ',
        'QUEUE' : ' ',
        'verbose' : ' '
        'start_date' : ' '
        'end_date' : ' '
        ]
    }
    
    operator = SparkSubmitOperator(
        task_id='spark_submit_job',
        dag=dag,
        **_config
    )
    
like image 399
Nick Avatar asked Apr 21 '18 15:04

Nick


People also ask

How do I run Spark submit in Airflow?

On the Spark page you can download the tgz file and unzip it on the machine that hosts Airflow. Put in the file . bashrc the SPARK_HOME and add it to the system PATH. Finally you must add the pyspark package to the environment where Airflow runs.

How do we trigger Apache spark jobs from Airflow?

The steps involved in scheduling Spark Airflow Jobs are as follows: Scheduling Spark Airflow Jobs: Business Logic. Scheduling Spark Airflow Jobs: Diving into Airflow. Scheduling Spark Airflow Jobs: Building the DAG.


1 Answers

The start date is something you set once, and it's intended to be set absolutely, not relative to the current day.

Like this:

from airflow import DAG

dag = DAG(
    ...
    start_date=datetime.datetime(2018, 4, 13),
)

It's possible to set start date as a delta like datetime.timedelta(days=7), but this is not recommended, since it would change the start date if you were to delete the DAG (including all references such as DAG runs, task instances, etc) and run it again from scratch on another day. The best practice is for DAGs to be idempotent.

For submitting a job to Spark, there is a SparkSubmitOperator that wraps the spark-submit shell command. That would be the preferred option. That said, you can do basically anything with a BashOperator, so that's a workable alternative too.

The linked code for SparkSubmitOperator is well documented for each argument it accepts. You can point to your .jar file with the application kwarg, pass Spark config with conf. There are also kwargs for passing info like executor cores and memory. You can use application_args to pass a list of arbitrary args through to your Spark application.

Here's an example of using SparkSubmitOperator copied and slightly simplified from the unit tests for it in Airflow. Note that it uses ** to explode the kwargs from a dict to initialize the Spark operator, but that's just how the test is structured. You could just as easily pass each config value as a kwarg.

from airflow import DAG

from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone


DEFAULT_DATE = timezone.datetime(2017, 1, 1)

args = {
    'owner': 'airflow',
    'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)

_config = {
    'conf': {
        'parquet.compression': 'SNAPPY'
    },
    'files': 'hive-site.xml',
    'py_files': 'sample_library.py',
    'driver_classpath': 'parquet.jar',
    'jars': 'parquet.jar',
    'packages': 'com.databricks:spark-avro_2.11:3.2.0',
    'exclude_packages': 'org.bad.dependency:1.0.0',
    'repositories': 'http://myrepo.org',
    'total_executor_cores': 4,
    'executor_cores': 4,
    'executor_memory': '22g',
    'keytab': 'privileged_user.keytab',
    'principal': 'user/[email protected]',
    'name': '{{ task_instance.task_id }}',
    'num_executors': 10,
    'verbose': True,
    'application': 'test_application.py',
    'driver_memory': '3g',
    'java_class': 'com.foo.bar.AppMain',
    'application_args': [
        '-f', 'foo',
        '--bar', 'bar',
        '--start', '{{ macros.ds_add(ds, -1)}}',
        '--end', '{{ ds }}',
        '--with-spaces', 'args should keep embdedded spaces',
    ]
}

operator = SparkSubmitOperator(
    task_id='spark_submit_job',
    dag=dag,
    **_config
)

Source: https://github.com/apache/incubator-airflow/blob/f520990fe0b7a70f80bec68cb5c3f0d41e3e984d/tests/contrib/operators/test_spark_submit_operator.py

like image 80
Taylor D. Edmiston Avatar answered Sep 29 '22 00:09

Taylor D. Edmiston