I have multiple dags using Celery Executor but I want one particular dag to run using Kubernetes Executor. I am unable to deduce a good and reliable way to achieve this.
I have an airflow.cfg
in which I have declared CeleryExecutor
to be used. And I don't want to change it since it is really needed in all the dags but one.
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
My dag code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample_1', default_args=default_args)
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="Python:3.6",
cmds=["Python", "-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
failing = KubernetesPodOperator(namespace='default',
image="ubuntu:1604",
cmds=["Python", "-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
failing.set_upstream(start)
I can put an if-else condition and then change the value from the point where Airflow picks up the configuration. If this sounds right, please tell me the paths and the files. Although I was hoping to get a more mature method, if it exists.
The CeleryKubernetesExecutor allows users to run simultaneously a CeleryExecutor and a KubernetesExecutor . An executor is chosen to run a task based on the task's queue.
To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.
CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow. cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.
Now there is the CeleryKubernetesExecutor (can't see when it was exactly introduced), which requires to set up Celery and Kubernetes up, but also offers the functionalities from both.
In the official documentation, they offer a rule of thumb to decide when it's worth using it:
We recommend considering the CeleryKubernetesExecutor when your use case meets:
The number of tasks needed to be scheduled at the peak exceeds the scale that your Kubernetes cluster can comfortably handle
A relative small portion of your tasks requires runtime isolation.
You have plenty of small tasks that can be executed on Celery workers but you also have resource-hungry tasks that will be better to run in predefined environments.
Starting Airflow 2.x configure airflow.cfg
as follows:
In [core]
section set executor = CeleryKubernetesExecutor
and in [celery_kubernetes_executor]
section set kubernetes_queue = kubernetes
. So whenever you want to run a task instance in the kubernetes executor, add the parameter queue = kubernetes
in the task definition. for eg.
task1= BashOperator(
task_id='Test_kubernetes_executor',
bash_command='echo Kubernetes',
queue = 'kubernetes'
)
task2 = BashOperator(
task_id='Test_Celery_Executor',
bash_command='echo Celery',
)
On running the dag you will see task1 running in k8s and task2 in celery. Hence unless you write the queue as kubernetes, all dag will run on celery executor
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With