Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kubernetes executor do not parallelize sub DAGs execution in Airflow

We moved away from the Celery Executor in Airflow 1.10.0 because of some limitations of execution and right now we're using KubernetesExecutor.

Right now we're not able to parallelize all the tasks in some DAGs even when we change the subdag_operator in the code directly: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38

Our expectations it's that with these modifications and using Kubernetes Executors we can fan out the execution of all tasks at the same time but we have the same behavior of the SequentialExecutor.

This is the behavior that we have right now:

enter image description here

We would like to execute all of them at the same time using KubernetesExecutor.

like image 796
Flavio Avatar asked Sep 03 '18 09:09

Flavio


People also ask

How do you use Kubernetes executor in Airflow?

The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster. KubernetesExecutor runs as a process in the Airflow Scheduler. The scheduler itself does not necessarily need to be running on Kubernetes, but does need access to a Kubernetes cluster.

Which executor is best for Airflow?

Airflow comes configured with the SequentialExecutor by default, which is a local executor, and the safest option for execution, but we strongly recommend you change this to LocalExecutor for small, single-machine installations, or one of the remote executors for a multi-machine/cloud installation.

What is the purpose of an executor in Airflow?

The Executor acts as a middle man to handle resource utilization and how to distribute work best. Although an Airflow job is organized at the DAG level, the execution phase of a job is more granular, and the Executor runs at the task level.

How does Airflow work with Kubernetes?

The Airflow local settings file ( airflow_local_settings.py ) can define a pod_mutation_hook function that has the ability to mutate pod objects before sending them to the Kubernetes client for scheduling. It receives a single argument as a reference to pod objects, and is expected to alter its attributes.


1 Answers

Kubernetes Executor in Airflow will turn all the first level of tasks into a worker pod with Local Executor.

It means that you will get the Local Executor to execute your SubDagOperator.

In order to run the tasks under SubDagOperator after the spawning the worker pod, you will need to specify the configuration parallelism for the worker pod. So, in case you are using the YAML format for worker pod, you will need to edit it to something like this.

apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - args: []
      command: []
      env:
        ###################################
        # This is the part you need to add
        ###################################
        - name: AIRFLOW__CORE__PARALLELISM
          value: 10
        ###################################
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      envFrom: []
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      ports: []
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: false
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
          subPath: repo/tests/dags
  hostNetwork: false
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
  nodeSelector:
    {}
  affinity:
    {}
  tolerations:
    []
  serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
  volumes:
    - name: dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-local-settings

Then, SubDagOperator will follow the parallelism specified to run the tasks in parallel.

like image 160
Ryan Siu Avatar answered Oct 16 '22 06:10

Ryan Siu