We moved away from the Celery Executor in Airflow 1.10.0 because of some limitations of execution and right now we're using KubernetesExecutor
.
Right now we're not able to parallelize all the tasks in some DAGs even when we change the subdag_operator
in the code directly: https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38
Our expectations it's that with these modifications and using Kubernetes Executors we can fan out the execution of all tasks at the same time but we have the same behavior of the SequentialExecutor
.
This is the behavior that we have right now:
We would like to execute all of them at the same time using KubernetesExecutor
.
The Kubernetes executor runs each task instance in its own pod on a Kubernetes cluster. KubernetesExecutor runs as a process in the Airflow Scheduler. The scheduler itself does not necessarily need to be running on Kubernetes, but does need access to a Kubernetes cluster.
Airflow comes configured with the SequentialExecutor by default, which is a local executor, and the safest option for execution, but we strongly recommend you change this to LocalExecutor for small, single-machine installations, or one of the remote executors for a multi-machine/cloud installation.
The Executor acts as a middle man to handle resource utilization and how to distribute work best. Although an Airflow job is organized at the DAG level, the execution phase of a job is more granular, and the Executor runs at the task level.
The Airflow local settings file ( airflow_local_settings.py ) can define a pod_mutation_hook function that has the ability to mutate pod objects before sending them to the Kubernetes client for scheduling. It receives a single argument as a reference to pod objects, and is expected to alter its attributes.
Kubernetes Executor in Airflow will turn all the first level of tasks into a worker pod with Local Executor.
It means that you will get the Local Executor to execute your SubDagOperator
.
In order to run the tasks under SubDagOperator after the spawning the worker pod, you will need to specify the configuration parallelism
for the worker pod. So, in case you are using the YAML format for worker pod, you will need to edit it to something like this.
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- args: []
command: []
env:
###################################
# This is the part you need to add
###################################
- name: AIRFLOW__CORE__PARALLELISM
value: 10
###################################
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
envFrom: []
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: false
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: true
subPath: repo/tests/dags
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
nodeSelector:
{}
affinity:
{}
tolerations:
[]
serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
volumes:
- name: dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-local-settings
Then, SubDagOperator
will follow the parallelism
specified to run the tasks in parallel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With