I am trying to test a dag with more than one task in the test environment. I was able to test single task associated with the dag but I want to create several tasks in dag and kick of the first task. For testing one task in a dag I am using
task1.run()
which is getting executed. But, the same is not working when I have many tasks one after another in downstream of a dag.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
t1.run() # It is executing just first task.
In order to run second task I have to run using t2.run() which I don't want as I am designing a DAG. How to achieve this?
I'm not totally sure I understand your question yet, but I'll take a stab at starting an answer.
If your goal is to just run the DAG or a subsets of its tasks manually, you can achieve this from the CLI, such as:
$ airflow run ...
- run a task instance$ airflow test ...
- test a task instance without checking dependencies or recording state in database$ airflow trigger_dag ...
- trigger a specific DAG run of a DAGCLI docs - https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html
I think the airflow run command is the one most relevant to your use case.
At runtime, scheduling tasks in the DAGs and running downstream dependencies once their requirements are met is all handled automatically by the executor. You shouldn't need to call run() anywhere in your code.
As far as the run method itself, the code is still there:
Questions
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With