Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Scheduler out of memory problems

We are experimenting with Apache Airflow (version 1.10rc2, with python 2.7) and deploying it to kubernetes, webserver and scheduler to different pods, and the database is as well using cloud sql, but we have been facing out of memory problems with the scheduler pod.

At the moment of the OOM, we were running only 4 example Dags (approximately 20 tasks). The memory for the pod is 1Gib. I've seen in other posts that a task might consume approximately 50Mib of memory when running, and all task operations are in memory, nothing is flushed to disk, so that would give already 1Gb.

Is there any rule of thumb we can use to calculate how much memory would we need for the scheduler based on parallel tasks?

Is there any tuning, apart from decreasing the parallelism, that could be done in order to decrease the use of memory in the scheduler itself?

I don't think our use case would require Dask, or Celery to horizontally scale Airflow with more machines for the workers.

Just a few more details about the confguration:

executor = Localexecutor
parallelism = 10
dag_concurrency = 5
max_active_runs_per_dag = 2
workers = 1
worker_concurrency = 16
min_file_process_interval = 1
min_file_parsing_loop_time = 5
dag_dir_list_interval = 30

The dags running at the time were example_bash_operator, example_branch_operator, example_python_operator and one quickDag we have developed.

All of them just with simple tasks / operators like DummyOperators, BranchOperatos, BashOperators in some cases but doing only echo or sleep and PythonOperators doing only sleep as well. In total it would be aproximately 40 tasks, but not all of them were running in parallel because some of them were downstream, depencies and so on, and our parallelism is set to 10, with just a single worker as described above, and dag_concurrency is set to 5.

I cant see anything abnormal in the airflow logs, and neither in the task logs.

Running just one of these dags, it seems that airflow is working accordingly.

I can see a lot of scheduler processes in the scheduler pod, each one using 0.2% of memory or more:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
461384 airflow 20 0 836700 127212 23908 S 36.5 0.4 0:01.19 /usr/bin/python /usr/bin/airflow scheduler 461397 airflow 20 0 356168 86320 5044 R 14.0 0.3 0:00.42 /usr/bin/python /usr/bin/airflow scheduler 44 airflow 20 0 335920 71700 10600 S 28.9 0.2 403:32.05 /usr/bin/python /usr/bin/airflow scheduler 56 airflow 20 0 330548 59164 3524 S 0.0 0.2 0:00.02

And this is one of the tasks running using 0.3% of memory:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 462042 airflow 20 0 282632 91120 10544 S 1.7 0.3 0:02.66 /usr/bin/python /usr/bin/airflow run example_bash_operator runme_1 2018-08-29T07:39:48.193735+00:00 --local -sd /usr/lib/python2.7/site-packages/apache_airflow-1.10.0-py2.7.egg/airflow/example_dags/example_bash_operator.py

like image 717
FERNANDO SOUZA Avatar asked Aug 28 '18 14:08

FERNANDO SOUZA


People also ask

How do I restart my scheduler Airflow?

You can do start/stop/restart actions on an Airflow service and the commands used for each service are given below: Run sudo monit <action> scheduler for Airflow Scheduler. Run sudo monit <action> webserver for Airflow Webserver.

How do I know if my scheduler is running Airflow?

CLI Check for Scheduler BaseJob with information about the host and timestamp (heartbeat) at startup, and then updates it regularly. You can use this to check if the scheduler is working correctly. To do this, you can use the airflow jobs checks command. On failure, the command will exit with a non-zero error code.

What does celery do in Airflow?

Airflow Celery is a task queue that helps users scale and integrate with other languages. It comes with the tools and support you need to run such a system in production. Executors in Airflow are the mechanism by which users can run the task instances.


1 Answers

There isn't really a concise rule of thumb to follow because it can vary so much based on your workflow.

As you've seen, the scheduler will create several fork processes. Also every task (except Dummy) will run in it's own process. Depending on the operator and data it's processing the amount of memory needed per task can vary wildly.

The parallelism setting will directly limit how many task are running simultaneously across all dag runs/tasks, which would have the most dramatic effect for you using the LocalExecutor. You can also try setting max_threads under [scheduler] to 1.

So a (very) general rule of thumb being gracious with resources:

[256 for scheduler itself] + ( [parallelism] * (100MB + [size of data you'll process]) )

Where size of data will need to change depending on whether you load a full dataset, or process chunks of it over the course of the execution of the task.

Even if you don't think you'll need to scale your cluster, I would still recommend using the CeleryExecutor, if only to isolate the scheduler and tasks from each other. That way if your scheduler or celery worker dies, it doesn't take both down. Especially running in k8, if your scheduler sigterms it's going to kill it along with any running tasks. If you run them in different pods and the scheduler pod restarts, you're tasks you can finish uninterrupted. If you have more workers, it would lessen the impact of memory/processing spikes from other tasks.

like image 84
cwurtz Avatar answered Oct 19 '22 07:10

cwurtz