Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to have a mix of both Celery Executor and Kubernetes Executor in Apache Airflow?

I have multiple dags using Celery Executor but I want one particular dag to run using Kubernetes Executor. I am unable to deduce a good and reliable way to achieve this.

I have an airflow.cfg in which I have declared CeleryExecutor to be used. And I don't want to change it since it is really needed in all the dags but one.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor

My dag code:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
    KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample_1', default_args=default_args)


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                                image="Python:3.6",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="passing-test",
                                task_id="passing-task",
                                get_logs=True,
                                dag=dag
                                )

failing = KubernetesPodOperator(namespace='default',
                                image="ubuntu:1604",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="fail",
                                task_id="failing-task",
                                get_logs=True,
                                dag=dag
                                )

passing.set_upstream(start)
failing.set_upstream(start)

I can put an if-else condition and then change the value from the point where Airflow picks up the configuration. If this sounds right, please tell me the paths and the files. Although I was hoping to get a more mature method, if it exists.

like image 393
Aviral Srivastava Avatar asked May 27 '19 06:05

Aviral Srivastava


People also ask

What is Celery Kubernetes executor?

The CeleryKubernetesExecutor allows users to run simultaneously a CeleryExecutor and a KubernetesExecutor . An executor is chosen to run a task based on the task's queue.

How do I set up Airflow with Celery executor?

To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.

What is Celery executor in Airflow?

CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow. cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.


2 Answers

Now there is the CeleryKubernetesExecutor (can't see when it was exactly introduced), which requires to set up Celery and Kubernetes up, but also offers the functionalities from both.

In the official documentation, they offer a rule of thumb to decide when it's worth using it:

We recommend considering the CeleryKubernetesExecutor when your use case meets:

The number of tasks needed to be scheduled at the peak exceeds the scale that your Kubernetes cluster can comfortably handle

A relative small portion of your tasks requires runtime isolation.

You have plenty of small tasks that can be executed on Celery workers but you also have resource-hungry tasks that will be better to run in predefined environments.

like image 156
Alessandro S. Avatar answered Oct 07 '22 10:10

Alessandro S.


Starting Airflow 2.x configure airflow.cfg as follows: In [core] section set executor = CeleryKubernetesExecutor and in [celery_kubernetes_executor] section set kubernetes_queue = kubernetes. So whenever you want to run a task instance in the kubernetes executor, add the parameter queue = kubernetes in the task definition. for eg.

task1= BashOperator(
        task_id='Test_kubernetes_executor',
        bash_command='echo Kubernetes',
        queue = 'kubernetes'
    )
task2 = BashOperator(
        task_id='Test_Celery_Executor',
        bash_command='echo Celery',
    )

On running the dag you will see task1 running in k8s and task2 in celery. Hence unless you write the queue as kubernetes, all dag will run on celery executor

like image 2
caxefaizan Avatar answered Oct 07 '22 10:10

caxefaizan