Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to skip tasks on Airflow?

I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?

Lets say my DAG graph look like this: task1 > task2 > task3 > task4

And I would like to start my DAG manually from task3, what is the best way of doing that?

I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.

Thanks!

like image 756
Maayan Avatar asked Sep 05 '18 17:09

Maayan


2 Answers

You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)
    
    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition
    
    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)
       
        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")
like image 185
Ben Gregory Avatar answered Nov 10 '22 06:11

Ben Gregory


yes, you can do this by another ad-hoc basis. Found it somehow!!

You need to raise AirflowSkipException

from airflow.exceptions import AirflowSkipException
def execute():
    if condition:
        raise AirflowSkipException

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
like image 37
Sourav Roy Chowdhury Avatar answered Nov 10 '22 07:11

Sourav Roy Chowdhury