Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamically building collection to loop over in Airflow dag

Tags:

python

airflow

I have been working with Airflow a lot recently and finding a very common pattern is to loop over some collection to create multiple tasks. Very similar to the example_python_operator.py dag found in the example dags folder in github.

My question has to do with dynamically building up the collection the loop is iterating over. Let's say you want to create a task for each of an unknown set of clients stored in a database and you plan to query them as a means to populate your list. Something like this:

first_task = PythonOperator(
    task_id='some_upstream_task',
    provide_context=True,
    python_callable=some_upstream_task,
    dag=dag)

clients = my_database_query()

for client in clients:
    task = PythonOperator(
        task_id='client_' + str(client),
        python_callable=some_function,
        dag=dag)

    task.set_upstream(first_task)

From what I have seen this means that even if your dag only runs weekly your database is being polled every 30 seconds for these clients. Even if you set an upstream operator from the iterator and return the clients via xcoms and replace the my_database_query() with an xcom_pull() your still polling xcoms every 30 secs. This seems wasteful to me, so I'm wondering if there are any better patterns for this type of dag?

like image 924
moku Avatar asked Mar 06 '18 20:03

moku


1 Answers

In your code sample we don't see the schedule interval of the DAG, but I'm assuming that you have it scheduled let's say @daily, and that you want the DB query to run once a day.

In Airflow, the DAG is parsed periodically by the scheduler (hence the "every 30 seconds"). So your python code causes an issue.

In your case, I would consider changing perspective : why not trying to run the database query in a PosgresOperator link and then make that part of the DAG ? Based on the output of that Operator (that you can propagate via XCOM for example or via a file in Object Storage) you can then have a PythonOperator downstream that does not run a function for one client but for all of them.

like image 84
louis_guitton Avatar answered Sep 29 '22 13:09

louis_guitton