Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass parameter to PythonOperator in Airflow

Tags:

python

airflow

I just started using Airflow, can anyone enlighten me how to pass a parameter into PythonOperator like below:

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs=None,
    #op_kwargs=(key1='value1', key2='value2'),
    dag=dag,
)

def SendEmail(**kwargs):
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()

I would like to be able to pass some parameters into the t5_send_notification's callable which is SendEmail, ideally I want to attach the full log and/or part of the log (which is essentially from the kwargs) to the email to be sent out, guessing the t5_send_notification is the place to gather those information.

Thank you very much.

like image 516
mdivk Avatar asked Feb 26 '19 21:02

mdivk


People also ask

What does Provide_context do in Airflow?

provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.

What is a PythonOperator?

The PythonOperator in Airflow is responsible for running any Python code. Just like the BashOperator used before, this and all other operators require a task_id . The task_id is referenced when running a task and displayed in the UI.


2 Answers

  1. Pass a dict object to op_kwargs
  2. Use the keys to access their value from kwargs dict in your python callable

    def SendEmail(**kwargs):
        print(kwargs['key1'])
        print(kwargs['key2'])
        msg = MIMEText("The pipeline for client1 is completed, please check.")
        msg['Subject'] = "xxxx"
        msg['From'] = "xxxx"
        ......
        s = smtplib.SMTP('localhost')
        s.send_message(msg)
        s.quit()
    
    
    t5_send_notification = PythonOperator(
        task_id='t5_send_notification',
        provide_context=True,
        python_callable=SendEmail,
        op_kwargs={'key1': 'value1', 'key2': 'value2'},
        dag=dag,
    )
    
like image 82
Ryan Yuan Avatar answered Oct 20 '22 00:10

Ryan Yuan


PythonOperator have a named parameter op_kwargs and accepts dict object.

have

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs={"my_param":'value1'},
    dag=dag,
)

def SendEmail(my_param,**kwargs):
    print(my_param) #'value_1'
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_me
like image 31
ethanenglish Avatar answered Oct 20 '22 00:10

ethanenglish