What is the best way to rerun a task (A) 3 times sequentially?:
That is task A -> task A -> task A -> task B
I ask because I will run another separate data validation task (B) that will compare the data from those 3 separate runs.
So this is what I have done so far:
dag = DAG("hello_world_0", description="Starting tutorial", schedule_interval='* * * * *',
start_date=datetime(2019, 1, 1),
catchup=False)
data_pull_1 = BashOperator(task_id='attempt_1', bash_command='echo "Hello World - 1!"',dag=dag)
data_pull_2 = BashOperator(task_id='attempt_2', bash_command='echo "Hello World - 2!"',dag=dag)
data_pull_3 = BashOperator(task_id='attempt_3', bash_command='echo "Hello World - 3!"',dag=dag)
data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"',dag=dag)
data_pull_1 >> data_pull_2 >> data_pull_3 >> data_validation
This might work but is there a more elegant way?
You can try below implementation, we create 3 operations by using for loop
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
"hello_world_0",
description="Starting tutorial",
schedule_interval=None,
start_date=datetime(2019, 1, 1),
catchup=False
)
chain_operators = []
max_attempt = 3
for attempt in range(max_attempt):
data_pull = BashOperator(
task_id='attempt_{}'.format(attempt),
bash_command='echo "Hello World - {}!"'.format(attempt),
dag=dag
)
chain_operators.append(data_pull)
data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"', dag=dag)
chain_operators.append(data_validation)
# Add downstream
for i,val in enumerate(chain_operators[:-1]):
val.set_downstream(chain_operators[i+1])
I changed schedule_interval to None, because with '* * * * *' job will be triggered continuously
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With