I have written a DAG with multiple PythonOperators
task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment', provide_context=True, python_callable=Task1, dag=dag1) def Task1(**kwargs): return(kwargs['dag_run'].conf.get('file'))
From PythonOperator i am calling "Task1" method. That method is returning a value,that value i need to pass to the next PythonOperator.How can i get the value from the "task1" variable or How can i get the value which is returned from Task1 method?
updated :
def Task1(**kwargs): file_name = kwargs['dag_run'].conf.get[file] task_instance = kwargs['task_instance'] task_instance.xcom_push(key='file', value=file_name) return file_name t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag) t2 = BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ', dag=dag, ) t2.set_upstream(t1)
You might want to check out Airflow's XCOM: https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
If you return a value from a function, this value is stored in xcom. In your case, you could access it like so from other Python code:
task_instance = kwargs['task_instance'] task_instance.xcom_pull(task_ids='Task1')
or in a template like so:
{{ task_instance.xcom_pull(task_ids='Task1') }}
If you want to specify a key you can push into XCOM (being inside a task):
task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str)
Then later on you can access it like so:
task_instance.xcom_pull(task_ids='my_task', key='the_key')
EDIT 1
Follow-up question: Instead of using the value in another function how can i pass the value to another PythonOperator like - "t2 = "BashOperator(task_id='Moving_bucket', bash_command='python /home/raw.py "%s" '%file_name, dag=dag)" --- i want to access file_name which is returned by "Task1". How can this will be acheived?
First of all, it seems to me that the value is, in fact, not being passed to another PythonOperator
but to a BashOperator
.
Secondly, this is already covered in my answer above. The field bash_command
is templated (see template_fields
in the source: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py). Hence, we can use the templated version:
BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ', dag=dag, )
EDIT 2
Explanation: Airflow works like this: It will execute Task1, then populate xcom and then execute the next task. So for your example to work you need Task1 executed first and then execute Moving_bucket downstream of Task1.
Since you are using a return function, you could also omit the key='file'
from xcom_pull
and not manually set it in the function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With