I have 30 individual tasks in a dag, they have no dependencies between each other. The tasks run the same code. The only difference is the data volume, some tasks will finish in secs, some tasks will take 2 hours or more.
The problem is during catchup, the tasks that finish in secs are blocked by tasks that take hours to finish before they move on to the next execution date.
I can break them up into individual dags but that seems silly and 30 tasks will grow to a bigger number in the future.
Is there any way to run tasks in the same dag at different execution times? Like as soon as a task finish, take on the next execution date, regardless of how other tasks are doing.
Adding pic for illustration. Basically, I'd like to see two more solid green boxes on the first row while the third row is still running behind.
Edit:
After y2k-shubham's explanation, I tried to implement it. But it's still not working. Fast task starts at 2019-01-30 00
, finishes in a sec, and does not start 2019-01-30 01
because the slow task is still running. If possible, it'd be ideal to run 2019-01-30 01
, 2019-01-30 02
, 2019-01-30 03
...in parallel if possible
Adding code example
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return 1
def slow(**kwargs):
time.sleep(600)
return 1
fast_task = PythonOperator(
task_id='fast',
python_callable=fast,
provide_context=True,
priority_weight=10000,
pool='fast_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
slow_task = PythonOperator(
task_id='slow',
python_callable=slow,
provide_context=True,
priority_weight=500,
pool='slow_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
fast_task >> slow_task # not working
Turns out there are two variables that can be set that will solve my problem very easily.
concurrency
and max_active_runs
In the below example, you can have 4 dags running and each dag can have 4 tasks running at the same time. Other combinations are also possible.
dag = DAG(
dag_id='sample_dag',
default_args=default_args,
schedule_interval='@daily',
# this will allow up to 16 tasks to be run at the same time
concurrency=16,
# this will allow up to 4 dags to be run at the same time
max_active_runs=4,
)
I can think of 3 possible solutions to your woes (will add more alternatives when they come to mind)
Set start_date
on individual task
s within the DAG
(apart from a start_date
of DAG
itself) as told here. However I would never favour this approach because it would be like a step back onto the same time-based crons that Airflow
tries to replace.
Use pool
s to segregate task
s by runtime / priority. Here's an idea (you might need to rework as per your requirements): Put all tiny task
s in tiny_task_pool
and all big ones in big_task_pool
. Let the tiny_task_pool
have significantly higher number of slot
s than big_task_pool
. That would make starvation of your tiny-tasks much less likely. You can go creative with even more levels of pool
s.
Even if your task
s have no real dependencies between them, it shouldn't hurt much to deliberately introduce some dependencies so that all (or most) big tasks are made downstream
of tiny ones (and hence change structure of your DAG
). That would dub into a shortest-job-first kind of approach. You can also explore priority_weight
/ priority_rule
to gain even more fine-grained control.
All the above alternatives assume that task
s' lengths (duration of execution) are known ahead of time. In real-world, that might not be true; or even if it is, it might gradually change overtime. For that, I'd suggest you to tweak your dag-definition script to factor-in the average (or median) runtime of your task
s over last 'n' runs to decide their priority.
start_date
method, just supply a later start_date
(actually same date, later time) to task
s that ran longer in previous runspool
s method, move task
s around different pool
s based on their previous running durationstask
s downstream
. This might sound difficult but you can visualize it like this: Create 3 DummyOperator
s and link them up (one after another). Now you have to fill-in all small tasks between the first 2 DummyOperator
s and the big ones between the next two.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With