Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Removing Airflow task logs

Tags:

airflow

I'm running 5 DAG's which have generated a total of about 6GB of log data in the base_log_folder over a months period. I just added a remote_base_log_folder but it seems it does not exclude logging to the base_log_folder.

Is there anyway to automatically remove old log files, rotate them or force airflow to not log on disk (base_log_folder) only in remote storage?

like image 845
jompa Avatar asked Apr 21 '17 17:04

jompa


People also ask

How do I turn off Airflow tasks?

Toggle the check boxes to the right of the run button to ignore dependencies, then click run. Show activity on this post. From the way Apache Airflow is built, you can write the logic/branches to determine which tasks to run. You cannot start task execution from any task in between.

Where are Airflow logs stored?

Users can specify the directory to place log files in airflow. cfg using base_log_folder . By default, logs are placed in the AIRFLOW_HOME directory.

How many tasks can Airflow handle?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.


2 Answers

Please refer https://github.com/teamclairvoyant/airflow-maintenance-dags

This plugin has DAGs that can kill halted tasks and log-cleanups. You can grab the concepts and can come up with a new DAG that can cleanup as per your requirement.

like image 108
Vinod Vutpala Avatar answered Sep 20 '22 15:09

Vinod Vutpala


We remove the Task logs by implementing our own FileTaskHandler, and then pointing to it in the airflow.cfg. So, we overwrite the default LogHandler to keep only N task logs, without scheduling additional DAGs.

We are using Airflow==1.10.1.

[core] logging_config_class = log_config.LOGGING_CONFIG 

log_config.LOGGING_CONFIG

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}' FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'  LOGGING_CONFIG = {     'formatters': {},     'handlers': {         '...': {},         'task': {             'class': 'file_task_handler.FileTaskRotationHandler',             'formatter': 'airflow.job',             'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),             'filename_template': FILENAME_TEMPLATE,             'folder_task_template': FOLDER_TASK_TEMPLATE,             'retention': 20         },         '...': {}     },     'loggers': {         'airflow.task': {             'handlers': ['task'],             'level': JOB_LOG_LEVEL,             'propagate': False,         },         'airflow.task_runner': {             'handlers': ['task'],             'level': LOG_LEVEL,             'propagate': True,         },         '...': {}     } } 

file_task_handler.FileTaskRotationHandler

import os import shutil  from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler   class FileTaskRotationHandler(FileTaskHandler):      def __init__(self, base_log_folder, filename_template, folder_task_template, retention):         """         :param base_log_folder: Base log folder to place logs.         :param filename_template: template filename string.         :param folder_task_template: template folder task path.         :param retention: Number of folder logs to keep         """         super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template)         self.retention = retention         self.folder_task_template, self.folder_task_template_jinja_template = \             parse_template_string(folder_task_template)      @staticmethod     def _get_directories(path='.'):         return next(os.walk(path))[1]      def _render_folder_task_path(self, ti):         if self.folder_task_template_jinja_template:             jinja_context = ti.get_template_context()             return self.folder_task_template_jinja_template.render(**jinja_context)          return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id)      def _init_file(self, ti):         relative_path = self._render_folder_task_path(ti)         folder_task_path = os.path.join(self.local_base, relative_path)         subfolders = self._get_directories(folder_task_path)         to_remove = set(subfolders) - set(subfolders[-self.retention:])          for dir_to_remove in to_remove:             full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove)             print('Removing', full_dir_to_remove)             shutil.rmtree(full_dir_to_remove)          return FileTaskHandler._init_file(self, ti) 
like image 27
Franzi Avatar answered Sep 16 '22 15:09

Franzi