I need to reference a variable that's returned by a BashOperator
. In my task_archive_s3_file
, I need to get the filename from get_s3_file
. The task simply prints {{ ti.xcom_pull(task_ids=submit_file_to_spark) }}
as a string instead of the value.
If I use the bash_command
, the value prints correctly.
get_s3_file = PythonOperator( task_id='get_s3_file', python_callable=obj.func_get_s3_file, trigger_rule=TriggerRule.ALL_SUCCESS, dag=dag) submit_file_to_spark = BashOperator( task_id='submit_file_to_spark', bash_command="echo 'hello world'", trigger_rule="all_done", xcom_push=True, dag=dag) task_archive_s3_file = PythonOperator( task_id='archive_s3_file', # bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}", python_callable=obj.func_archive_s3_file, params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" }, dag=dag) get_s3_file >> submit_file_to_spark >> task_archive_s3_file
Pulling a XCom with xcom_pull In order to pull a XCom from a task, you have to use the xcom_pull method. Like xcom_push, this method is available through a task instance object. xcom_pull expects 2 arguments: task_ids, only XComs from tasks matching ids will be pulled.
Templates like {{ ti.xcom_pull(...) }}
can only be used inside of parameters that support templates or they won't be rendered prior to execution. See the template_fields
, template_fields_renderers
and template_ext
attributes of the PythonOperator and BashOperator.
So templates_dict
is what you use to pass templates to your python operator:
def func_archive_s3_file(**context): archive(context['templates_dict']['s3_path_filename']) task_archive_s3_file = PythonOperator( task_id='archive_s3_file', dag=dag, python_callable=obj.func_archive_s3_file, provide_context=True, # must pass this because templates_dict gets passed via context templates_dict={'s3_path_filename': "{{ ti.xcom_pull(task_ids='submit_file_to_spark') }}" })
However in the case of fetching an XCom value, another alternative is just using the TaskInstance
object made available to you via context:
def func_archive_s3_file(**context): archive(context['ti'].xcom_pull(task_ids='submit_file_to_spark')) task_archive_s3_file = PythonOperator( task_id='archive_s3_file', dag=dag, python_callable=obj.func_archive_s3_file, provide_context=True,
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With