Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow 1.10.3 SubDag can only run 1 task in parallel even the concurrency is 8

Recently, I upgrade Airflow from 1.9 to 1.10.3 (latest one).

However I do notice a performance issue related to SubDag concurrency. Only 1 task inside the SubDag can be picked up, which is not the way it should be, our concurrency setting for the SubDag is 8.

See the following: get_monthly_summary-214 and get_monthly_summary-215 are the two SubDags, it can be run in parallel controller by the parent dag concurrency

enter image description here

But when zoom into the SubDag say get_monthly_summary-214, then enter image description here You can definitely see that there is only 1 task running at a time, the others are queued, and it keep running in this way. When we check the SubDag concurrency, it is actually 8 as we specified in the code: enter image description here

We do setup the pool slots size, it is 32, We do have 8 celery workers to pick up the queued task, and our airflow config associate with the concurrency is as follows:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16

Also all the SubDag are configured using the queue called mini, while all its inner tasks are the default queue called default, since we might some deadlock problems before if we running both SubDag operator and SubDag inner tasks on the same queue. I also tried to use the default queue for all the tasks and operators, it does not help.

The old version 1.9 seems to be fine that each SubDag can execute multiple tasks in parallel, did we miss anything ?

like image 705
Kevin Li Avatar asked May 09 '19 02:05

Kevin Li


3 Answers

Based on the Finding of @kaxil posted above, a work around solution if you still would like to execute tasks inside a subdag in parallel is creating a wrapper functiuon to explicitly pass the executor when construct SubDagOperator:

from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor

def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
    return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)

call sub_dag_operator_with_default_executor when you created your subdag operator. In order to relieve the sub dag operator performance concerns

We should change the default executor for subdag_operator to SequentialExecutor. Airflow pool is not honored by subdagoperator, hence it could consume all the worker resources(e.g in celeryExecutor). This causes issues mentioned in airflow-74 and limits the subdag_operator usage. We use subdag_operator in production by specifying using sequential executor.

We suggest to create a special queue (we specifiy queue='mini' in our cases) and celery worker to handle the subdag_operator, so that it is not consume all your normal celery worker's resources. As follows:

 dag = DAG(
    dag_id=DAG_NAME,
    description=f"{DAG_NAME}-{__version__}",
    ...
)    
with dag:
        ur_operator = sub_dag_operator_with_default_executor(
                task_id=f"your_task_id",
                subdag=load_sub_dag(
                    parent_dag_name=DAG_NAME,
                    child_dag_name=f"your_child_dag_name",
                    args=args,
                    concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
                ),
                queue="mini",
                dag=dag
            )

Then when you create your special celery worker (we are using a light weight host like 2 cores and 3G memory), specify AIRFLOW__CELERY__DEFAULT_QUEUE as mini, depends on how much sub dag operator you would like to run in parallel, you should create multiple special celery workers to load balances the resources, we suggest, each special celery worker should take care at most 2 sub dag operator at a time, or it will be exhausted (e.g., run out of memory on a 2 core and 3G memory host)

Also you can adjust concurrency inside your subdag via the ENV VAR concurrency_in_sub_dag created in airflow UI Variables configuration page.

Update [22/05/2020] the above only work for airflow (<=1.10.3, >= 1.10.0) For airflow beyone 1.10.3, please use

from airflow.executors import get_default_executor

instead.

like image 82
Kevin Li Avatar answered Oct 21 '22 13:10

Kevin Li


Thanks!. I changed a little bit the code for the latest airflow (1.10.5) GetDefaultExecutor not working anymore:

from airflow.executors.celery_executor import CeleryExecutor

def sub_dag_operator_with_celery_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=CeleryExecutor(), *args, 
**kwargs)
like image 22
Ohad Mata Avatar answered Oct 21 '22 14:10

Ohad Mata


That is because in Airflow 1.9.0, the default Executor was used by SubdagOperator.

Airflow 1.9.0: https://github.com/apache/airflow/blob/1.9.0/airflow/operators/subdag_operator.py#L33

class SubDagOperator(BaseOperator):

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    @provide_session
    @apply_defaults
    def __init__(
            self,
            subdag,
            executor=GetDefaultExecutor(),
            *args, **kwargs):

However, from Airflow 1.10 and onwards, the default executor for SubDagOperator is changed to SequentialExecutor

Airflow >=1.10: https://github.com/apache/airflow/blob/1.10.0/airflow/operators/subdag_operator.py#L38

class SubDagOperator(BaseOperator):

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    @provide_session
    @apply_defaults
    def __init__(
            self,
            subdag,
            executor=SequentialExecutor(),
            *args, **kwargs):

The commit that changed it is https://github.com/apache/airflow/commit/64d950166773749c0e4aa0d7032b080cadd56a53#diff-45749879e4753a355c5bdb5203584698

And the detailed reason it was changed can be found in https://github.com/apache/airflow/pull/3251

We should change the default executor for subdag_operator to SequentialExecutor. Airflow pool is not honored by subdagoperator, hence it could consume all the worker resources(e.g in celeryExecutor). This causes issues mentioned in airflow-74 and limits the subdag_operator usage. We use subdag_operator in production by specifying using sequential executor.

like image 7
kaxil Avatar answered Oct 21 '22 13:10

kaxil