I just started using Airflow, can anyone enlighten me how to pass a parameter into PythonOperator like below:
t5_send_notification = PythonOperator(
task_id='t5_send_notification',
provide_context=True,
python_callable=SendEmail,
op_kwargs=None,
#op_kwargs=(key1='value1', key2='value2'),
dag=dag,
)
def SendEmail(**kwargs):
msg = MIMEText("The pipeline for client1 is completed, please check.")
msg['Subject'] = "xxxx"
msg['From'] = "xxxx"
......
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()
I would like to be able to pass some parameters into the t5_send_notification
's callable which is SendEmail
, ideally I want to attach the full log and/or part of the log (which is essentially from the kwargs) to the email to be sent out, guessing the t5_send_notification
is the place to gather those information.
Thank you very much.
provide_context (bool) – if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates.
The PythonOperator in Airflow is responsible for running any Python code. Just like the BashOperator used before, this and all other operators require a task_id . The task_id is referenced when running a task and displayed in the UI.
Use the keys to access their value from kwargs dict in your python callable
def SendEmail(**kwargs):
print(kwargs['key1'])
print(kwargs['key2'])
msg = MIMEText("The pipeline for client1 is completed, please check.")
msg['Subject'] = "xxxx"
msg['From'] = "xxxx"
......
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()
t5_send_notification = PythonOperator(
task_id='t5_send_notification',
provide_context=True,
python_callable=SendEmail,
op_kwargs={'key1': 'value1', 'key2': 'value2'},
dag=dag,
)
PythonOperator have a named parameter op_kwargs
and accepts dict
object.
have
t5_send_notification = PythonOperator(
task_id='t5_send_notification',
provide_context=True,
python_callable=SendEmail,
op_kwargs={"my_param":'value1'},
dag=dag,
)
def SendEmail(my_param,**kwargs):
print(my_param) #'value_1'
msg = MIMEText("The pipeline for client1 is completed, please check.")
msg['Subject'] = "xxxx"
msg['From'] = "xxxx"
......
s = smtplib.SMTP('localhost')
s.send_me
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With