I have been evaluating airflow .I have this use case where I have a workflow that runs every hour to get hourly aggregates of the data. and another that runs every day to get daily aggregates of the same. Is it possible to create a combined workflow where the daily aggregate will run only if all the hourly aggregates have succeed in past day? I have seen that you can create sub dags but can the two dags run at a different frequency ? If yes How?
concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.
One method for dynamically generating DAGs is to have a single Python file which generates DAGs based on some input parameter(s) (e.g. a list of APIs or tables). A common use case for this is an ETL or ELT-type pipeline where there are many data sources or destinations.
To schedule a dag, Airflow just looks for the last execution date and sum the schedule interval . If this time has expired it will run the dag. You cannot simple update the start date. A simple way to do this is edit your start date and schedule interval , rename your dag (e.g. xxxx_v2.py) and redeploy it.
Not sure how you want this to work but while there isn't a straightforward way of doing this, there are a few ways you could use the extensive suite of airflow operators to build such a dag.
Example you could make the hourly dags depend_on_past
and then use a python branch operator to make the day aggregation task/dag be run/triggered at the end of the hourly dag for the last run of the day. Check out the PythonBranchOperator
and the TriggerDagRunOperator
.
You could also create your own sensor for the daily aggregator to make sure that all hourly dags for that day have succeeded. Check out ExternalTaskSensor
for reference.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With