From the airflow documentation:
SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
I understand the subdagoperator is actually implemented as a BackfillJob and thus we must provide a schedule_interval
to the operator. However, is there a way to get the semantic equivalent of schedule_interval="@once"
for a subdag? I'm worried that if I use set schedule_interval="@daily"
for the subdag that the subdag may run more than once if the subdag takes longer than a day to run.
def subdag_factory(parent_dag_name, child_dag_name, args):
subdag = DAG(
dag_id="{parent_dag_name}.{child_dag_name}".format(
parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
),
schedule_interval="@daily", # <--- this bit here
default_args=args
)
... do more stuff to the subdag here
return subdag
TLDR: how to fake out "only run this subdag once per trigger of the parent dag"
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
SubDAGs were a legacy feature in Airflow that allowed users to implement reusable patterns of tasks in their DAGs. SubDAGs caused performance and functional issues for many users, and they have been deprecated as of Airflow 2.0 and will be removed entirely in a future release.
Airflow triggers the DAG automatically based on the specified scheduling parameters. Trigger manually. You can trigger a DAG manually from the Airflow UI, or by running an Airflow CLI command from gcloud .
I find that
schedule=@once
works just fine for my subdags. Perhaps my version is outdated, but I've had more issues with my subdags failing even when all tasks succeeded (or were skipped) than the opposite.
Actual example code running quite happily live on my machine right now:
subdag_name = ".".join((parent_name,child_name))
logging.info(parent_name)
logging.info(subdag_name)
dag_subdag = DAG(
dag_id=subdag_name,
default_args=dargs,
schedule_interval="@once",
)
In fact, I originally built almost all my dags as glorified cfg files for my subdags. Not sure how good an idea that is after some trial and error, but schedule interval was never a blocker for me.
I'm running a relatively recent build of 1.8 with few customizations. I've been following the example dag suggestion of keeping my subdags in a folder inside the dags folder so they don't show up in the DagBag.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With