I have a spark job that runs via a Kubernetes pod . Till now I was using an Yaml file to run my jobs manually. Now , I want to schedule my spark jobs via airflow. This is the first time I am using airflow and I am unable to figure out how I can add my Yaml file in the airflow. From what I have read is that I can schedule my jobs via a DAG in Airflow. A dag example is this :
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {'owner':'test', 'start_date' : datetime(2019, 4, 3), 'retries': 2, 'retry_delay': timedelta(minutes=1) }
dag = DAG('test_dag', default_args = args, catchup=False)
def print_text1():
print("hell-world1")
def print_text():
print('Hello-World2')
t1 = PythonOperator(task_id='multitask1', python_callable=print_text1, dag=dag)
t2 = PythonOperator(task_id='multitask2', python_callable=print_text, dag=dag)
t1 >> t2
In this case the above methods will get executed on after the other once I play the DAG. Now , in case I want to run a spark submit job , what should I do? I am using Spark 2.4.4
Airflow has a concept of operators, which represent Airflow tasks. In your example PythonOperator is used, which simply executes Python code and most probably not the one you are interested in, unless you submit Spark job within Python code. There are several operators that you can take use of:
kubectl
or spark-submit
using it directlyspark-submit
Note: for each of the operators you need to ensure that your Airflow environment contains all the required dependencies for execution as well as the credentials configured to access the required services.
Also you can refer the existing thread:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With