Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Worker Configuration

I am a newbie to Airflow. I'm trying to setup Distributed Mode of Airflow Using Celery Executor by Refering this article https://stlong0521.github.io/20161023%20-%20Airflow.html

Before getting into detail about the specification I would like to confirm that I've installed PostgreSQL on a seperate instance.

The specification of the setup is detailed below:

Airflow core/server computer

  • Python 3.5
    • airflow (AIRFLOW_HOME = ~/airflow)
    • celery
    • psycogp2
  • RabbitMQ

Configurations made in airflow.cfg:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow

Tests performed:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow worker computer

Has the following installed:

  • Python 3.5 with
    • airflow (AIRFLOW_HOME = ~/airflow)
    • celery
  • psycogp2

Configurations made in airflow.cfg are exactly the same as in the server:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow

Output from commands run on the worker machine:

When running airflow flower:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:[email protected]:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:[email protected]:5672//

I am passing the dag in the Airflow Core machine and also I have copied the sample data(Excel sheets) which the dag will process to the same core machine.

My worker log raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

Now my query is

1) Should I copy the dag folder to the worker computer also

2) Right now, I have not copied the dag folder on the worker computer and I'm not able to see the worker process pick up the task.

Please point me where I am making a mistake and how to make the worker process pick up the process.

like image 939
Soundar Raj Avatar asked Feb 20 '18 07:02

Soundar Raj


People also ask

What is worker in Airflow?

The worker is the node or processor which runs the actual tasks. The Airflow scheduler won't run any tasks but handle tasks over to the Executor. The Executor acts as a middle man to handle resource utilization and how to distribute work best.

How do you find the Airflow configuration?

The first time you run Airflow, it will create a file called airflow. cfg in your $AIRFLOW_HOME directory ( ~/airflow by default). This file contains Airflow's configuration and you can edit it to change any of the settings.

How many tasks can an Airflow worker handle?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

What is Celery worker Airflow?

Airflow Celery is a task queue that helps users scale and integrate with other languages. It comes with the tools and support you need to run such a system in production. Executors in Airflow are the mechanism by which users can run the task instances.


Video Answer


3 Answers

Some of the biggest pain points with Airflow come up around deployment and keeping DAG files and plugins in sync across your Airflow scheduler, Airflow webserver, and Celery worker nodes.

We've created an open source project called Astronomer Open that automates a Dockerized Airflow, Celery, and PostgreSQL with some other goodies baked in. The project was motivated by seeing so many people hit the same pain points creating a very similar setup.

For example, here's the Airflow Dockerfile: https://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile

And the docs: https://open.astronomer.io/

Full disclosure: This is a project I contribute to at work — we offer a paid enterprise edition as well that runs on Kubernetes (docs). That said, the Open Edition is totally free to use.

like image 102
Taylor D. Edmiston Avatar answered Oct 22 '22 23:10

Taylor D. Edmiston


Your configuration files look okay. As you suspected, all workers do indeed require a copy of the DAG folder. You can use something like git to keep them in sync and up to date.

like image 4
Daniel Huang Avatar answered Oct 22 '22 21:10

Daniel Huang


A little late on this, but it might still help someone, as from the existing answers it looks like there is no way to share DAGs other then "manual" deployment (via git/scp etc.), while there is a way.

Airflow supports pickling (-p parameter from the CLI or command: scheduler -p in your docker-compose file), which allows to deploy the DAGs on the server/master, and have them serialized and sent to the workers (so you don't have to deploy DAGs in multiple places and you avoid issues with out-of-sync DAGs).

Pickling is compatible with CeleryExecutor.

Pickling has some limitations that can bite you back, notably the actual code of classes and functions is not serialized (only the fully qualified name is), so there will be an error if you try to deserialize a DAG referring to code you don't have in the target environment. For more info on pickle you can have a look here: https://docs.python.org/3.3/library/pickle.html

like image 1
Alessandro S. Avatar answered Oct 22 '22 22:10

Alessandro S.