Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Airflow How to xcom_pull() value into a DAG?

I have a custom operator which pushes XCOM value as below:

...
task_instance = context['task_instance']
task_instance.xcom_push("list_of_files",file_list)
...

It works fine. I have a dag definition file (my_dag.py) where I create a task by using my own operator, it pushes the XCOM value then I want to do for in loop by using this xcom value. How to pull it?

like image 893
Vadzim Nemchenko Avatar asked Jul 20 '17 15:07

Vadzim Nemchenko


People also ask

How do you pass parameters in Airflow DAG?

You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field. Save this answer.

What is Xcom_pull in Airflow?

XCom push/pull just adds/retrieves a row from the xcom table in the airflow DB based on DAG id, execution date, task id, and key.

How do you pass data between operators in Airflow?

Airflow uses Xcoms to pass data between operators. If the flow is operator A -> operator B, then operator A must "push" a value to xcom, and operator B must "pull" this value from A if it wants to read it.


2 Answers

You can't access the XCOM variable in your dag, it is only available in operators by supplying the provide_context=True argument to the operators constructor.

In the case where you want to use data from an operator in your DAG structure itself, you would need to perform the actual task your operator is performing outisde of an operator.

def get_file_list():
    hook = SomeHook()
    hook.run('something to get file list')

dag = DAG('tutorial', default_args=default_args)

for file in get_file_list():
    task = SomeOperator(params={'file': file}) # Do something with the file passed as a parameter
like image 57
Matthijs Brouns Avatar answered Nov 11 '22 00:11

Matthijs Brouns


It is generally bad practice to access xcom from the dag itself rather than from a task in the dag. That said, sometimes it is necessary. For example, you may need to do this when dynamically creating dags.

Here is an example of me pulling some unrun jobs within a dag. I'm using this in the context of a subdag, so I can rest assured that the xcom will always contain the information assuming the method is running.

    xcom_unrun_jobs = None
    if len(parent_dag.get_active_runs()) > 0:
        tis = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]
        xcom_unrun_jobs = tis.xcom_pull(dag_id=parent_dag._dag_id, task_ids=unrun_job_task_id)
like image 27
melchoir55 Avatar answered Nov 10 '22 23:11

melchoir55