I have been working with Airflow a lot recently and finding a very common pattern is to loop over some collection to create multiple tasks. Very similar to the example_python_operator.py dag found in the example dags folder in github.
My question has to do with dynamically building up the collection the loop is iterating over. Let's say you want to create a task for each of an unknown set of clients stored in a database and you plan to query them as a means to populate your list. Something like this:
first_task = PythonOperator(
task_id='some_upstream_task',
provide_context=True,
python_callable=some_upstream_task,
dag=dag)
clients = my_database_query()
for client in clients:
task = PythonOperator(
task_id='client_' + str(client),
python_callable=some_function,
dag=dag)
task.set_upstream(first_task)
From what I have seen this means that even if your dag only runs weekly your database is being polled every 30 seconds for these clients. Even if you set an upstream operator from the iterator and return the clients via xcoms and replace the my_database_query()
with an xcom_pull()
your still polling xcoms every 30 secs. This seems wasteful to me, so I'm wondering if there are any better patterns for this type of dag?
In your code sample we don't see the schedule interval of the DAG, but I'm assuming that you have it scheduled let's say @daily
, and that you want the DB query to run once a day.
In Airflow, the DAG is parsed periodically by the scheduler (hence the "every 30 seconds"). So your python code causes an issue.
In your case, I would consider changing perspective : why not trying to run the database query in a PosgresOperator link and then make that part of the DAG ? Based on the output of that Operator (that you can propagate via XCOM for example or via a file in Object Storage) you can then have a PythonOperator downstream that does not run a function for one client but for all of them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With