I’m setting up a distributed Airflow cluster where everything else except the celery workers are run on one host and processing is done on several hosts. The airflow2.0 setup is configured using the yaml file given at the Airflow documentation https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml . In my initial tests I got the architecture to work nicely when I run everything at the same host. The question is, how to start the celery workers at the remote hosts?
So far, I tried to create a trimmed version of the above docker-compose where I only start the celery workers at the worker host and nothing else. But I run into some issues with db connection. In the trimmed version I changed the URL so that they point to the host that runs the db and redis.
dags, logs, plugins and the postgresql db are located on a shared drive that is visible to all hosts.
How should I do the configuration? Any ideas what to check? Connections etc.? Celery worker docker-compose configuration:
---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment:
&airflow-common-env
AIRFLOW_UID: 50000
AIRFLOW_GID: 50000
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN:
postgresql+psycopg2://airflow:[email protected]:8080/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@[email protected]:8080/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@[email protected]:6380/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
REDIS_PORT: 6380
volumes:
- /airflow/dev/dags:/opt/airflow/dags
- /airflow/dev/logs:/opt/airflow/logs
- /airflow/dev/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
airflow-remote-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
EDIT 1: I'm Still having some difficulties with the log files. It appears that sharing the log directory doesn't solve the issue of missing log files. I added the extra_host definition on main like suggested and opened the port 8793 on the worker machine. The worker tasks fail with log:
*** Log file does not exist:
/opt/airflow/logs/tutorial/print_date/2021-07-
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''
It can distribute tasks on multiple workers by using a protocol to transfer jobs from the main application to Celery workers. It relies on a message broker to transfer the messages.
To set up the Airflow Celery Executor, first, you need to set up an Airflow Celery backend using the message broker services such as RabbitMQ, Redis, etc. After that, you need to change the airflow. cfg file to point the executor parameters to CeleryExecutor and enter all the required configurations for it.
Far from being the "ultimate set-up", these are some settings that worked for me using the docker-compose from Airflow in the core node and the workers:
The worker nodes have to be reachable from the main node where the Webserver
runs. I found this diagram of the CeleryExecutor
architecture to be very helpful to sort things out.
When trying to read the logs, if they are not found locally, it will try to retrieve them from the remote worker. Thus your main node may not know the hostname of your workers, so you either change how the hostnames are being resolved (hostname_callable
setting, which defaults to socket.getfqdn
) or you just simply add name resolution capability to the Webserver
. This could be done by adding the extra_hosts
config key in the x-airflow-common
definition:
---
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
...# env vars
extra_hosts:
- "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
- "worker-02-hostname:worker-02-ip-address"
*Note that in your specific case where you have a shared drive, so I think the logs will be found locally.
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4
Of course, the values to be set depend on your specific case and available resources. This article has a good overview of the subject. DAG settings could also be overridden at DAG
definition.
Define worker CELERY__WORKER_CONCURRENCY
, default could be the numbers of CPUs available on the machine (docs).
Define how to reach the services running in the main node. Set an IP or hostname and watch out for matching exposed ports in the main node:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
environment: &airflow-common-env
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
env_file:
- .env
.env file: FERNET_KEY=jvYUaxxxxxxxxxxxxx=
It's critical that every node in the cluster (main and workers) has the same settings applied.
Define a hostname to the worker service to avoid autogenerated matching the container id.
Expose port 8793, which is the default port used to fetch the logs from the worker (docs):
services:
airflow-worker:
<<: *airflow-common
hostname: ${HOSTNAME}
ports:
- 8793:8793
command: celery worker
restart: always
If you have heavy workloads and high concurrency in general, you may need to tune Postgres settings such as max_connections
and shared_buffers
. The same applies to the host OS network settings such as ip_local_port_range
or somaxconn
.
In any issues I encountered during the initial cluster setup, Flower
and the worker execution logs always provided helpful details and error messages, both the task-level logs and the Docker-Compose service log i.e: docker-compose logs --tail=10000 airflow-worker > worker_logs.log
.
Hope that works for you!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With