Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to test airflow dag in unittest?

I am trying to test a dag with more than one task in the test environment. I was able to test single task associated with the dag but I want to create several tasks in dag and kick of the first task. For testing one task in a dag I am using

task1.run()

which is getting executed. But, the same is not working when I have many tasks one after another in downstream of a dag.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t2.set_upstream(t1)

t1.run() # It is executing just first task.

In order to run second task I have to run using t2.run() which I don't want as I am designing a DAG. How to achieve this?

like image 543
mad_ Avatar asked Apr 24 '18 13:04

mad_


1 Answers

I'm not totally sure I understand your question yet, but I'll take a stab at starting an answer.

If your goal is to just run the DAG or a subsets of its tasks manually, you can achieve this from the CLI, such as:

  • $ airflow run ... - run a task instance
  • $ airflow test ... - test a task instance without checking dependencies or recording state in database
  • $ airflow trigger_dag ... - trigger a specific DAG run of a DAG

CLI docs - https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html

I think the airflow run command is the one most relevant to your use case.

At runtime, scheduling tasks in the DAGs and running downstream dependencies once their requirements are met is all handled automatically by the executor. You shouldn't need to call run() anywhere in your code.

As far as the run method itself, the code is still there:

  • DAG.run(...) - https://github.com/apache/incubator-airflow/blob/985a433a8d03906edfa49a99aa86c026769aff57/airflow/models.py#L4061-L4076
  • TaskInstance.run(...) - https://github.com/apache/incubator-airflow/blob/985a433a8d03906edfa49a99aa86c026769aff57/airflow/models.py#L1711-L1722

Questions

  1. When you say test a DAG "in the test environment" what do you mean exactly? Like on the CI or in unit tests?
  2. Is this code for a test or code from one of your actual DAGs?
  3. Is this related to your other recent question Test Dag run for Airflow 1.9 in unittest?
like image 131
Taylor D. Edmiston Avatar answered Sep 18 '22 20:09

Taylor D. Edmiston