Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - How to pass xcom variable into Python function

Tags:

airflow

I need to reference a variable that's returned by a BashOperator. In my task_archive_s3_file, I need to get the filename from get_s3_file. The task simply prints {{ ti.xcom_pull(task_ids=submit_file_to_spark) }} as a string instead of the value.

If I use the bash_command, the value prints correctly.

get_s3_file = PythonOperator(     task_id='get_s3_file',     python_callable=obj.func_get_s3_file,     trigger_rule=TriggerRule.ALL_SUCCESS,     dag=dag)  submit_file_to_spark = BashOperator(     task_id='submit_file_to_spark',     bash_command="echo 'hello world'",     trigger_rule="all_done",     xcom_push=True,     dag=dag)  task_archive_s3_file = PythonOperator(     task_id='archive_s3_file', #    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",     python_callable=obj.func_archive_s3_file,     params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },     dag=dag)  get_s3_file >> submit_file_to_spark >> task_archive_s3_file 
like image 377
sdot257 Avatar asked Sep 05 '17 15:09

sdot257


People also ask

How do you pull data from XCom?

Pulling a XCom with xcom_pull In order to pull a XCom from a task, you have to use the xcom_pull method. Like xcom_push, this method is available through a task instance object. xcom_pull expects 2 arguments: task_ids, only XComs from tasks matching ids will be pulled.


1 Answers

Templates like {{ ti.xcom_pull(...) }} can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields, template_fields_renderers and template_ext attributes of the PythonOperator and BashOperator.

So templates_dict is what you use to pass templates to your python operator:

def func_archive_s3_file(**context):     archive(context['templates_dict']['s3_path_filename'])  task_archive_s3_file = PythonOperator(     task_id='archive_s3_file',     dag=dag,     python_callable=obj.func_archive_s3_file,     provide_context=True,  # must pass this because templates_dict gets passed via context     templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" }) 

However in the case of fetching an XCom value, another alternative is just using the TaskInstance object made available to you via context:

def func_archive_s3_file(**context):     archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark'))  task_archive_s3_file = PythonOperator(     task_id='archive_s3_file',     dag=dag,     python_callable=obj.func_archive_s3_file,     provide_context=True, 
like image 131
Daniel Huang Avatar answered Oct 02 '22 07:10

Daniel Huang