Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - xcom value acess into custom operator

Tags:

airflow

I am using Airlfow, since last 6 months. I felt so happy to define the workflows in Airflow. I have the below scenario where I am not able to get the xcom value (highlighted in yellow color).

Please find the code below sample code:

Work Flow

def push_function(**context):
context['ti'].xcom_push(key='reportid', value='xyz')

dummy_operator = DummyOperator(
task_id='Start',
dag=main_dag
)

push_function_task = PythonOperator(
    task_id='push_function',
    provide_context=True,
    python_callable=push_function,
    op_kwargs={},
    dag=main_dag)


push_function_task .set_upstream(dummy_operator)

custom_task = CustomOperator(
        dag=main_dag,
        task_id='import_data',
        provide_context=True,
        url="http://www.google.com/{}".format("{{task_instance.xcom_pull(task_ids='push_function')}}")

     )

custom_task .set_upstream(push_function_task)

Notes: 1. CustomOperator is my own operator wtritten for downloading the data for the given URL

Please help me.

Thanks, Samanth

like image 723
user2661475 Avatar asked Mar 05 '23 13:03

user2661475


1 Answers

I believe you have a mismatch in keys when pushing and pulling the XCom. Each XCom value is tied to a DAG ID, task ID, and key. If you are pushing with report_id key, then you need to pull with it as well.

Note, if a key is not specified to xcom_pull(), it uses the default of return_value. This is because if a task returns a result, Airflow will automatically push it to XCom under the return_value key.

This gives you two options to fix your issue:

1) Continue to push to the report_id key and make sure you pull from it as well

def push_function(**context):
    context['ti'].xcom_push(key='reportid', value='xyz')

...

custom_task = CustomOperator(
    ...
    url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='push_function', key='reportid') }}")
)

2) Have push_function() return the value you want to push to XCom, then pull from the default key.

def push_function(**context):
    return 'xyz'

...

custom_task = CustomOperator(
    ...
    url="http://www.google.com/{}".format("{{ task_instance.xcom_pull(task_ids='push_function') }}")
)
like image 114
Daniel Huang Avatar answered May 08 '23 11:05

Daniel Huang