Anyone know where I can add extra celery configs to airflow celery executor? For instance I want http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts this property but how do I allow extra celery properties..
Use the just-released Airflow 1.9.0 and this is now configurable.
In airflow.cfg there is this line:
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
which points to a python file from the import path. The current default version can is https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/default_celery.py
If you need a setting that isn't tweakable via that file then create a new module, say 'my_celery_config.py':
CELERY_CONFIG = {
# ....
}
and put it in your AIRFLOW_HOME dir (i.e. along side the dags/ folder) and then set celery_config_options = my_celery_config.CELERY_CONFIG
in the config.
In case you're running Airflow in Docker and you want to change the Celery configuration, you need to do the following:
Create an Airflow config
folder (if you don't have one already) at the same level where your dags
folder is and add a custom celery configuration file (e.g. custom_celery_config.py
) there.
Change the default Celery configuration in the custom_celery_config.py
. The idea is that this python script should contain a variable, which contains the default Celery configuration plus your changes to it. E.g. if you like to change the task_queues
configuration of Celery, your custom_celery_config.py
should look like this:
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
from kombu import Exchange, Queue
CELERY_TASK_QUEUES = [
Queue('task1', Exchange('task1', type='direct'), routing_key='task1', queue_arguments={'x-max-priority': 8}),
Queue('task2', Exchange('task2', type='direct'), routing_key='task2', queue_arguments={'x-max-priority': 6}),
]
CELERY_CONFIG = {
**DEFAULT_CELERY_CONFIG,
"task_queues": CELERY_TASK_QUEUES
}
Mount the config
folder in the docker-compose.yml
:
volumes:
- /data/airflow/config:/opt/airflow/config
Set the Celery configuration in the docker-compose.yml
like this (since Docker can see the config
folder, it can access your custom_celery_config.py
):
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: 'custom_celery_config.CELERY_CONFIG'
Restart the Airflow Webserver, Scheduler etc.
Reference: here.
For more info about the Celery configuration check this documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With