I am a newbie to Airflow. I'm trying to setup Distributed Mode of Airflow Using Celery Executor by Refering this article https://stlong0521.github.io/20161023%20-%20Airflow.html
Before getting into detail about the specification I would like to confirm that I've installed PostgreSQL on a seperate instance.
The specification of the setup is detailed below:
Airflow core/server computer
Configurations made in airflow.cfg:
sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow
Tests performed:
RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)
Airflow worker computer
Has the following installed:
Configurations made in airflow.cfg are exactly the same as in the server:
sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow
Output from commands run on the worker machine:
When running airflow flower:
[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:[email protected]:5672//
[I 180219 14:58:15 command:147] Registered tasks:
['celery.accumulate',
'celery.backend_cleanup',
'celery.chain',
'celery.chord',
'celery.chord_unlock',
'celery.chunks',
'celery.group',
'celery.map',
'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:[email protected]:5672//
I am passing the dag in the Airflow Core machine and also I have copied the sample data(Excel sheets) which the dag will process to the same core machine.
My worker log
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1
Now my query is
1) Should I copy the dag folder to the worker computer also
2) Right now, I have not copied the dag folder on the worker computer and I'm not able to see the worker process pick up the task.
Please point me where I am making a mistake and how to make the worker process pick up the process.
The worker is the node or processor which runs the actual tasks. The Airflow scheduler won't run any tasks but handle tasks over to the Executor. The Executor acts as a middle man to handle resource utilization and how to distribute work best.
The first time you run Airflow, it will create a file called airflow. cfg in your $AIRFLOW_HOME directory ( ~/airflow by default). This file contains Airflow's configuration and you can edit it to change any of the settings.
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
Airflow Celery is a task queue that helps users scale and integrate with other languages. It comes with the tools and support you need to run such a system in production. Executors in Airflow are the mechanism by which users can run the task instances.
Some of the biggest pain points with Airflow come up around deployment and keeping DAG files and plugins in sync across your Airflow scheduler, Airflow webserver, and Celery worker nodes.
We've created an open source project called Astronomer Open that automates a Dockerized Airflow, Celery, and PostgreSQL with some other goodies baked in. The project was motivated by seeing so many people hit the same pain points creating a very similar setup.
For example, here's the Airflow Dockerfile: https://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile
And the docs: https://open.astronomer.io/
Full disclosure: This is a project I contribute to at work — we offer a paid enterprise edition as well that runs on Kubernetes (docs). That said, the Open Edition is totally free to use.
Your configuration files look okay. As you suspected, all workers do indeed require a copy of the DAG folder. You can use something like git
to keep them in sync and up to date.
A little late on this, but it might still help someone, as from the existing answers it looks like there is no way to share DAGs other then "manual" deployment (via git/scp etc.), while there is a way.
Airflow supports pickling (-p
parameter from the CLI or command: scheduler -p
in your docker-compose file), which allows to deploy the DAGs on the server/master, and have them serialized and sent to the workers (so you don't have to deploy DAGs in multiple places and you avoid issues with out-of-sync DAGs).
Pickling is compatible with CeleryExecutor
.
Pickling has some limitations that can bite you back, notably the actual code of classes and functions is not serialized (only the fully qualified name is), so there will be an error if you try to deserialize a DAG referring to code you don't have in the target environment. For more info on pickle you can have a look here: https://docs.python.org/3.3/library/pickle.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With