I was reading the API Docs, and was unclear what the context argument is for BaseOperator.xcom_pull.
I thought it would be dag.default_args, but I receive KeyError: 'ti'
I performed an xcom_push within a prior task following the push() example here.
Context Manager¶ Added in Airflow 1.8. DAGs can be used as context managers to automatically assign new operators to that DAG. with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: op = DummyOperator('op') op. dag is dag # True.
XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key.
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines. An XCom is identified by a key (essentially its name), as well as the task_id and dag_id it came from.
The xcom_pull() method - It's used to pull a list of return values from one or multiple Airflow tasks. Note the plural of the first argument. Specify a list of task IDs from which you want to fetch values stored in XComs.
The context is a set of keyword arguments containing reference objects related to a task instance, such as dag
, dag_run
, run_id
, execution_date
, etc. (including the task instance ti
itself).
The default context is generated when task instance runs, and is defined here.
In the example you mentioned, the way context is passed in isn't super obvious: if the provide_context
arg is set to True, Airflow will pass the generated context to the python callable. You can then access the task instance's xcom_pull method by calling kwargs['ti'].xcom_pull()
.
So that's why for this to work, you will need to define **kwargs
in your python callable's header, and set the operator's provide_context
arg to True
.
I am not sure how to directly get hold of the context reference though.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With