Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding extra celery configs to Airflow

Tags:

celery

airflow

Anyone know where I can add extra celery configs to airflow celery executor? For instance I want http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts this property but how do I allow extra celery properties..

like image 501
Ace Haidrey Avatar asked Jan 03 '23 18:01

Ace Haidrey


2 Answers

Use the just-released Airflow 1.9.0 and this is now configurable.

In airflow.cfg there is this line:

# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

which points to a python file from the import path. The current default version can is https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/default_celery.py

If you need a setting that isn't tweakable via that file then create a new module, say 'my_celery_config.py':

CELERY_CONFIG = {
    # ....
}

and put it in your AIRFLOW_HOME dir (i.e. along side the dags/ folder) and then set celery_config_options = my_celery_config.CELERY_CONFIG in the config.

like image 155
Ash Berlin-Taylor Avatar answered Jan 06 '23 07:01

Ash Berlin-Taylor


In case you're running Airflow in Docker and you want to change the Celery configuration, you need to do the following:

  1. Create an Airflow config folder (if you don't have one already) at the same level where your dags folder is and add a custom celery configuration file (e.g. custom_celery_config.py) there.

  2. Change the default Celery configuration in the custom_celery_config.py. The idea is that this python script should contain a variable, which contains the default Celery configuration plus your changes to it. E.g. if you like to change the task_queues configuration of Celery, your custom_celery_config.py should look like this:

    from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
    from kombu import Exchange, Queue
    
     CELERY_TASK_QUEUES = [
         Queue('task1', Exchange('task1', type='direct'), routing_key='task1', queue_arguments={'x-max-priority': 8}),
         Queue('task2', Exchange('task2', type='direct'), routing_key='task2', queue_arguments={'x-max-priority': 6}),
     ]
    
     CELERY_CONFIG = {
         **DEFAULT_CELERY_CONFIG,
         "task_queues": CELERY_TASK_QUEUES
     }
    
  3. Mount the config folder in the docker-compose.yml:

    volumes:
        - /data/airflow/config:/opt/airflow/config
    
  4. Set the Celery configuration in the docker-compose.yml like this (since Docker can see the config folder, it can access your custom_celery_config.py):

    AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: 'custom_celery_config.CELERY_CONFIG'
    
  5. Restart the Airflow Webserver, Scheduler etc.

Reference: here.

For more info about the Celery configuration check this documentation.

like image 37
tsveti_iko Avatar answered Jan 06 '23 09:01

tsveti_iko