I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?
Lets say my DAG graph look like this: task1 > task2 > task3 > task4
And I would like to start my DAG manually from task3, what is the best way of doing that?
I've read about ShortCircuitOperator
, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.
Thanks!
You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.
from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults
class mySkippingOperator(BaseOperator, SkipMixin)
@apply_defaults
def __init__(self,
condition,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.condition = condition
def execute(self, context):
if self.condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
self.log.info("Done.")
yes, you can do this by another ad-hoc basis. Found it somehow!!
You need to raise AirflowSkipException
from airflow.exceptions import AirflowSkipException
def execute():
if condition:
raise AirflowSkipException
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With