Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow - creating dynamic Tasks from XCOM

Tags:

python

airflow

I'm attempting to generate a set of dynamic tasks from a XCOM variable. In the XCOM I'm storing a list and I want to use each element of the list to dynamically create a downstream task.

My use case is that I have an upstream operator that checks a sftp server for files and returns a list of file names matching specific criteria. I want to create dynamic downstream tasks for each of the file names returned.

I've simplified it to the below, and while it works I feel like its not an idiomatic airflow solution. In my use case, I would write a python function that's called from a python operator that pulls the value from xcom and returns it, instead of using the pusher function.

I understand that while I can create a custom operator that combines both I don't think creating a throwaway operator is good practice and I'm hoping there's another solution.

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    "owner": "test",
    "depends_on_past": False,
    "start_date": datetime(2018, 10, 27),
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "email_on_success": False,
    "retries": 0,
    "provide_context": True
}

dag = DAG("test",  default_args=default_args, schedule_interval="@daily", catchup=False)


def pusher(**context):
    return ['a', 'b', 'c', 'd', 'e']

pusher_task = PythonOperator(
    task_id='pusher_task',
    dag=dag,
    python_callable=pusher  
)

def bash_wrapper(task, **context):
    return BashOperator(
        task_id='dynamic'+task,
        dag=dag,
        bash_command='date'
    )

end = BashOperator(task_id='end', dag=dag, bash_command='echo task has ended')


pusher_task >> [bash_wrapper(task) for task in pusher()] >> end
like image 688
Jackson Avatar asked Apr 14 '19 06:04

Jackson


1 Answers

I wouldn't do what you're trying to achieve mainly because:

  1. XCOM value is a state generated in runtime
  2. DAG structure is something determined in parse time

Even if you use something like the following to get an access to XCOM values generated by some upstream task:

from airflow.models import TaskInstance
from airflow.utils.db import provide_session

dag = DAG(...)

@provide_session
def get_files_list(session):
    execution_date = dag.previous_schedule(datetime.now())

    // Find previous task instance:
    ti = session.query(TaskInstance).filter(
        TaskInstance.dag_id == dag.dag_id,
        TaskInstance.execution_date == execution_date,
        TaskInstance.task_id == upstream_task_id).first()
    if ti:
        files_list = ti.xcom_pull()
        if files_list:
            return files_list
    // Return default state:
    return {...}


files_list = get_files_list()
// Generate tasks based on upstream task state:
task = PythonOperator(
    ...
    xcom_push=True,
    dag=dag)

But this would behave very strangely, because DAG parsing and task execution are not synchronised in a way you wish.

If the main reason you want to do this is parallelising files processing, I'd have some static number of processing tasks (determined by the required parallelism) that read files list from upstream task's XCOM value and operate on a relevant portion of that list.

Another option is parallelising files processing using some framework for distributed computations like Apache Spark.

like image 182
Michael Spector Avatar answered Sep 19 '22 08:09

Michael Spector