Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow xcom_pull is not giving the data of same upstream task instance run, instead gives most recent data

Tags:

python

airflow

I am creating an Airflow @daily DAG, It has an upstream task get_daily_data of BigQueryGetDataOperator which fetches data based on execution_date and on downstream dependent task (PythonOperator) uses above date based data via xcom_pull. When I run the airflow backfill command, the downstream task process_data_from_bq where I am doing xcom_pull, it gets the recent data only not the data of the same execution date which the downstream task is expecting. Airfow documentation is saying if we pass If xcom_pull is passed a single string for task_ids, then the most recent XCom value from that task is returned However its not saying how to get the data of same instance of the DAG execution.

I went through the one same question How to pull xcom value from other task instance in the same DAG run (not the most recent one)? however, the one solution given there is what I am already doing. but seems its not the correct answer.

DAG defination:

dag = DAG(
    'daily_motor',
    default_args=default_args,
    schedule_interval='@daily'
)

#This task creates data in a BigQuery table based on execution date
extract_daily_data  = BigQueryOperator(
    task_id='daily_data_extract',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    sql=policy_by_transaction_date_sql('{{ ds }}'), 
    destination_dataset_table='Test.daily_data_tmp',
    dag=dag)


get_daily_data = BigQueryGetDataOperator(
    task_id='get_daily_data',
    dataset_id='Test',
    table_id='daily_data_tmp',
    max_results='10000',
    dag=dag

)


#This is where I need to pull the data of the same execution date/same instance of DAG run not the most recent task run

def process_bq_data(**kwargs):
    bq_data = kwargs['ti'].xcom_pull(task_ids = 'get_daily_data')
    #This bq_data is most recent one not of the same execution date
    obj_creator = IibListToObject()
    items = obj_creator.create(bq_data, 'daily')
    save_daily_date_wise(items)


process_data = PythonOperator(
    task_id='process_data_from_bq',
    python_callable=process_bq_data,
    provide_context=True,
    dag = dag
)

get_daily_data.set_upstream(extract_daily_data)
process_data.set_upstream(get_daily_data)
like image 948
Shiv Avatar asked Oct 28 '22 13:10

Shiv


1 Answers

You must be receiving latest Xcom value. You need to also be sure that values are actually from same execution_date as it is supposed :

:param include_prior_dates: 

If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.

like image 115
Enrique Avatar answered Nov 09 '22 05:11

Enrique