I have a custom operator which pushes XCOM value as below:
...
task_instance = context['task_instance']
task_instance.xcom_push("list_of_files",file_list)
...
It works fine. I have a dag definition file (my_dag.py) where I create a task by using my own operator, it pushes the XCOM value then I want to do for in loop by using this xcom value. How to pull it?
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field. Save this answer.
XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key.
Airflow uses Xcoms to pass data between operators. If the flow is operator A -> operator B, then operator A must "push" a value to xcom, and operator B must "pull" this value from A if it wants to read it.
You can't access the XCOM variable in your dag, it is only available in operators by supplying the provide_context=True
argument to the operators constructor.
In the case where you want to use data from an operator in your DAG structure itself, you would need to perform the actual task your operator is performing outisde of an operator.
def get_file_list():
hook = SomeHook()
hook.run('something to get file list')
dag = DAG('tutorial', default_args=default_args)
for file in get_file_list():
task = SomeOperator(params={'file': file}) # Do something with the file passed as a parameter
It is generally bad practice to access xcom from the dag itself rather than from a task in the dag. That said, sometimes it is necessary. For example, you may need to do this when dynamically creating dags.
Here is an example of me pulling some unrun jobs within a dag. I'm using this in the context of a subdag, so I can rest assured that the xcom will always contain the information assuming the method is running.
xcom_unrun_jobs = None
if len(parent_dag.get_active_runs()) > 0:
tis = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
xcom_unrun_jobs = tis.xcom_pull(dag_id=parent_dag._dag_id, task_ids=unrun_job_task_id)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With