Recently, I upgrade Airflow from 1.9 to 1.10.3 (latest one).
However I do notice a performance issue related to SubDag concurrency. Only 1 task inside the SubDag can be picked up, which is not the way it should be, our concurrency setting for the SubDag is 8.
See the following:
get_monthly_summary-214
and get_monthly_summary-215
are the two SubDags, it can be run in parallel controller by the parent dag concurrency
But when zoom into the SubDag say get_monthly_summary-214
, then
You can definitely see that there is only 1 task running at a time, the others are queued, and it keep running in this way. When we check the SubDag concurrency, it is actually 8 as we specified in the code:
We do setup the pool slots size, it is 32, We do have 8 celery workers to pick up the queued task, and our airflow config associate with the concurrency is as follows:
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
Also all the SubDag are configured using the queue called mini
, while all its inner tasks are the default queue called default
, since we might some deadlock problems before if we running both SubDag operator and SubDag inner tasks on the same queue. I also tried to use the default
queue for all the tasks and operators, it does not help.
The old version 1.9 seems to be fine that each SubDag can execute multiple tasks in parallel, did we miss anything ?
Based on the Finding of @kaxil posted above, a work around solution if you still would like to execute tasks inside a subdag in parallel is creating a wrapper functiuon to explicitly pass the executor
when construct SubDagOperator
:
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor
def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
call sub_dag_operator_with_default_executor
when you created your subdag operator. In order to relieve the sub dag operator performance concerns
We should change the default executor for subdag_operator to SequentialExecutor. Airflow pool is not honored by subdagoperator, hence it could consume all the worker resources(e.g in celeryExecutor). This causes issues mentioned in airflow-74 and limits the subdag_operator usage. We use subdag_operator in production by specifying using sequential executor.
We suggest to create a special queue (we specifiy queue='mini' in our cases) and celery worker to handle the subdag_operator, so that it is not consume all your normal celery worker's resources. As follows:
dag = DAG(
dag_id=DAG_NAME,
description=f"{DAG_NAME}-{__version__}",
...
)
with dag:
ur_operator = sub_dag_operator_with_default_executor(
task_id=f"your_task_id",
subdag=load_sub_dag(
parent_dag_name=DAG_NAME,
child_dag_name=f"your_child_dag_name",
args=args,
concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
),
queue="mini",
dag=dag
)
Then when you create your special celery worker (we are using a light weight host like 2 cores and 3G memory), specify AIRFLOW__CELERY__DEFAULT_QUEUE
as mini
, depends on how much sub dag operator you would like to run in parallel, you should create multiple special celery workers to load balances the resources, we suggest, each special celery worker should take care at most 2 sub dag operator at a time, or it will be exhausted (e.g., run out of memory on a 2 core and 3G memory host)
Also you can adjust concurrency
inside your subdag via the ENV VAR concurrency_in_sub_dag
created in airflow UI Variables
configuration page.
Update [22/05/2020] the above only work for airflow (<=1.10.3, >= 1.10.0) For airflow beyone 1.10.3, please use
from airflow.executors import get_default_executor
instead.
Thanks!. I changed a little bit the code for the latest airflow (1.10.5) GetDefaultExecutor not working anymore:
from airflow.executors.celery_executor import CeleryExecutor
def sub_dag_operator_with_celery_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=CeleryExecutor(), *args,
**kwargs)
That is because in Airflow 1.9.0, the default Executor was used by SubdagOperator.
Airflow 1.9.0: https://github.com/apache/airflow/blob/1.9.0/airflow/operators/subdag_operator.py#L33
class SubDagOperator(BaseOperator):
template_fields = tuple()
ui_color = '#555'
ui_fgcolor = '#fff'
@provide_session
@apply_defaults
def __init__(
self,
subdag,
executor=GetDefaultExecutor(),
*args, **kwargs):
However, from Airflow 1.10 and onwards, the default executor for SubDagOperator is changed to SequentialExecutor
Airflow >=1.10: https://github.com/apache/airflow/blob/1.10.0/airflow/operators/subdag_operator.py#L38
class SubDagOperator(BaseOperator):
template_fields = tuple()
ui_color = '#555'
ui_fgcolor = '#fff'
@provide_session
@apply_defaults
def __init__(
self,
subdag,
executor=SequentialExecutor(),
*args, **kwargs):
The commit that changed it is https://github.com/apache/airflow/commit/64d950166773749c0e4aa0d7032b080cadd56a53#diff-45749879e4753a355c5bdb5203584698
And the detailed reason it was changed can be found in https://github.com/apache/airflow/pull/3251
We should change the default executor for subdag_operator to SequentialExecutor. Airflow pool is not honored by subdagoperator, hence it could consume all the worker resources(e.g in celeryExecutor). This causes issues mentioned in airflow-74 and limits the subdag_operator usage. We use subdag_operator in production by specifying using sequential executor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With