Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamic task definition in Airflow

I’m currently trying to use Airflow to orchestrate a process where some operators are defined dynamically and depend on the output of another (earlier) operator.

In the code below t1 updates a text file with new records (these are actually read from an external queue but, for simplicity, I hard coded them as A, B and C here). Then, I want to create separate operators for each record read from that text file. These operators will create directories A, B and C, respectively, and in Airflow UI will be seen as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.

dag = DAG('Test_DAG',
          description="Lorem ipsum.",
          start_date=datetime(2017, 3, 20),
          schedule_interval=None,
          catchup=False)


def create_text_file(list_of_rows):
    text_file = open('text_file.txt', "w")
    for row in list_of_rows:
        text_file.write(row + '\n')
    text_file.close()


def read_text():
    txt_file = open('text_file.txt', 'r')
    return [element for element in txt_file.readlines()]


t1 = PythonOperator(
    task_id='Create_text_file',
    python_callable=create_text_file,
    op_args=[['A', 'B', 'C']],
    dag=dag
)

for row in read_text():
    t2 = BashOperator(
        task_id='Create_directory_{}'.format(row),
        bash_command="mkdir {{params.dir_name}}",
        params={'dir_name': row},
        dag=dag
    )

    t1 >> t2

In Airflow’s documentation I can see that the scheduler will execute it [DAG] periodically to reflect the changes if any. Does that mean that there is a risk that, even though my t1 operator is executed before t2, the bash operators are created for the list of records before the update (as that's when the DAG was evaluated)?

like image 476
Dawid Avatar asked Feb 16 '18 11:02

Dawid


People also ask

What is dynamic DAG in Airflow?

Dynamic DAGs with environment variablesUsing Airflow Variables at top-level code creates a connection to metadata DB of Airflow to fetch the value, which can slow down parsing and place extra load on the DB. See the Airflow Variables on how to make best use of Airflow Variables in your DAGs using Jinja templates .

How do you define tasks in Airflow?

The key part of using Tasks is defining how they relate to each other - their dependencies, or as we say in Airflow, their upstream and downstream tasks. You declare your Tasks first, and then you declare their dependencies second. We call the upstream task the one that is directly preceding the other task.

How many tasks can Airflow handle?

You can also tune your worker_concurrency (environment variable: AIRFLOW__CELERY__WORKER_CONCURRENCY ), which determines how many tasks each Celery worker can run at any given time. By default, the Celery executor runs a maximum of sixteen tasks concurrently.


2 Answers

You cannot create tasks dynamically that depend on the output of an upstream task. You're mixing up schedule and execution time. A DAG definition and a task is created at schedule time. A DAG run and task instance is created at execution time. Only a task instance can produce output.

The Airflow scheduler will build the dynamic graph with whatever text_file.txt contains at schedule time. These tasks are then shipped off to the workers.

A worker will eventually execute the t1 task instance and create a new text_file.txt, but at this point, the list of t2 tasks has already been calculated by the scheduler and sent off to the workers.

So, whatever the latest t1 task instance dumps into text_file.txt will be used the next time the scheduler decides it's time to run the DAG.

If your task is fast and your workers are not backlogged, that will be the contents from the previous DAG run. If they are backlogged, text_file.txt contents may be stale, and if you're really unlucky, the scheduler reads the file while a task instance is writing to it, and you'll get incomplete data from read_text().

like image 147
Erik Cederstrand Avatar answered Nov 07 '22 14:11

Erik Cederstrand


This code will actually create one instance of t2 which will be bash operator built with the last row it gets from read_text(). I am certain this is not what you want.

A better approach would be to create a separate DAG for your t2 operator which is triggered when the file is written by t1. There's a SO question on this that might help: Apache Airflow - trigger/schedule DAG rerun on completion (File Sensor)

like image 4
Steve Avatar answered Nov 07 '22 13:11

Steve