I am creating an Airflow @daily DAG, It has an upstream task get_daily_data
of BigQueryGetDataOperator which fetches data based on execution_date and on downstream dependent task (PythonOperator) uses above date based data via xcom_pull. When I run the airflow backfill command, the downstream task process_data_from_bq
where I am doing xcom_pull, it gets the recent data only not the data of the same execution date which the downstream task is expecting.
Airfow documentation is saying if we pass If xcom_pull is passed a single string for task_ids, then the most recent XCom value from that task is returned
However its not saying how to get the data of same instance of the DAG execution.
I went through the one same question How to pull xcom value from other task instance in the same DAG run (not the most recent one)? however, the one solution given there is what I am already doing. but seems its not the correct answer.
DAG defination:
dag = DAG(
'daily_motor',
default_args=default_args,
schedule_interval='@daily'
)
#This task creates data in a BigQuery table based on execution date
extract_daily_data = BigQueryOperator(
task_id='daily_data_extract',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
sql=policy_by_transaction_date_sql('{{ ds }}'),
destination_dataset_table='Test.daily_data_tmp',
dag=dag)
get_daily_data = BigQueryGetDataOperator(
task_id='get_daily_data',
dataset_id='Test',
table_id='daily_data_tmp',
max_results='10000',
dag=dag
)
#This is where I need to pull the data of the same execution date/same instance of DAG run not the most recent task run
def process_bq_data(**kwargs):
bq_data = kwargs['ti'].xcom_pull(task_ids = 'get_daily_data')
#This bq_data is most recent one not of the same execution date
obj_creator = IibListToObject()
items = obj_creator.create(bq_data, 'daily')
save_daily_date_wise(items)
process_data = PythonOperator(
task_id='process_data_from_bq',
python_callable=process_bq_data,
provide_context=True,
dag = dag
)
get_daily_data.set_upstream(extract_daily_data)
process_data.set_upstream(get_daily_data)
You must be receiving latest Xcom value. You need to also be sure that values are actually from same execution_date as it is supposed :
:param include_prior_dates:
If False, only XComs from the current execution_date are returned. If True, XComs from previous dates are returned as well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With