I'm running 5 DAG's which have generated a total of about 6GB of log data in the base_log_folder
over a months period. I just added a remote_base_log_folder
but it seems it does not exclude logging to the base_log_folder
.
Is there anyway to automatically remove old log files, rotate them or force airflow to not log on disk (base_log_folder) only in remote storage?
Toggle the check boxes to the right of the run button to ignore dependencies, then click run. Show activity on this post. From the way Apache Airflow is built, you can write the logic/branches to determine which tasks to run. You cannot start task execution from any task in between.
Users can specify the directory to place log files in airflow. cfg using base_log_folder . By default, logs are placed in the AIRFLOW_HOME directory.
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
Please refer https://github.com/teamclairvoyant/airflow-maintenance-dags
This plugin has DAGs that can kill halted tasks and log-cleanups. You can grab the concepts and can come up with a new DAG that can cleanup as per your requirement.
We remove the Task logs by implementing our own FileTaskHandler
, and then pointing to it in the airflow.cfg
. So, we overwrite the default LogHandler to keep only N task logs, without scheduling additional DAGs.
We are using Airflow==1.10.1
.
[core] logging_config_class = log_config.LOGGING_CONFIG
log_config.LOGGING_CONFIG
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') FOLDER_TASK_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}' FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log' LOGGING_CONFIG = { 'formatters': {}, 'handlers': { '...': {}, 'task': { 'class': 'file_task_handler.FileTaskRotationHandler', 'formatter': 'airflow.job', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, 'folder_task_template': FOLDER_TASK_TEMPLATE, 'retention': 20 }, '...': {} }, 'loggers': { 'airflow.task': { 'handlers': ['task'], 'level': JOB_LOG_LEVEL, 'propagate': False, }, 'airflow.task_runner': { 'handlers': ['task'], 'level': LOG_LEVEL, 'propagate': True, }, '...': {} } }
file_task_handler.FileTaskRotationHandler
import os import shutil from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler class FileTaskRotationHandler(FileTaskHandler): def __init__(self, base_log_folder, filename_template, folder_task_template, retention): """ :param base_log_folder: Base log folder to place logs. :param filename_template: template filename string. :param folder_task_template: template folder task path. :param retention: Number of folder logs to keep """ super(FileTaskRotationHandler, self).__init__(base_log_folder, filename_template) self.retention = retention self.folder_task_template, self.folder_task_template_jinja_template = \ parse_template_string(folder_task_template) @staticmethod def _get_directories(path='.'): return next(os.walk(path))[1] def _render_folder_task_path(self, ti): if self.folder_task_template_jinja_template: jinja_context = ti.get_template_context() return self.folder_task_template_jinja_template.render(**jinja_context) return self.folder_task_template.format(dag_id=ti.dag_id, task_id=ti.task_id) def _init_file(self, ti): relative_path = self._render_folder_task_path(ti) folder_task_path = os.path.join(self.local_base, relative_path) subfolders = self._get_directories(folder_task_path) to_remove = set(subfolders) - set(subfolders[-self.retention:]) for dir_to_remove in to_remove: full_dir_to_remove = os.path.join(folder_task_path, dir_to_remove) print('Removing', full_dir_to_remove) shutil.rmtree(full_dir_to_remove) return FileTaskHandler._init_file(self, ti)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With