Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow won't write logs to s3

I tried different ways to configure Airflow 1.9 to write logs to s3 however it just ignores it. I found a lot of people having problems reading the Logs after doing so, however my problem is that the Logs remain local. I can read them without problem but they are not in the specified s3 bucket.

What I tried was first to write into the airflow.cfg file

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://bucketname/logs
remote_log_conn_id = aws
encrypt_s3_logs = False

Then I tried to set environment variables

AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucketname/logs
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws
AIRFLOW__CORE__ENCRYPT_S3_LOGS=False

However it gets ignored and the log files remain local.

I run airflow from a container, I adapted https://github.com/puckel/docker-airflow to my case but it won't write logs to s3. I use the aws connection to write to buckets in dags and this works but the Logs just remain local, no matter if I run it on an EC2 or locally on my machine.

like image 746
Yannick Widmer Avatar asked May 07 '18 21:05

Yannick Widmer


People also ask

How do I connect my S3 to Airflow?

3) Apache Airflow S3 ConnectionStep 1: Navigate to the Admin section of Airflow. Step 2: Now, click on the “Connections” option in the Airflow UI. Step 3: Make a new connection with the following properties: Enter the AWS credentials into the Airflow.

Where are Airflow logs stored?

Airflow logs: These logs are associated with single DAG tasks. You can view the task logs in the Cloud Storage logs folder associated with the Cloud Composer environment. You can also view the logs in the Airflow web interface.


1 Answers

I finally found an answer using https://stackoverflow.com/a/48969421/3808066 which is most of the work I then had to ad one more step. I reproduce this answer here and adapt it a bit the way I did:

Some things to check:

  1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py.
  2. Make sure you didn't forget the __init__.py file in that dir.
  3. Make sure you defined the s3.task handler and set its formatter to airflow.task
  4. Make sure you set airflow.task and airflow.task_runner handlers to s3.task
  5. Set task_log_reader = s3.task in airflow.cfg
  6. Pass the S3_LOG_FOLDER to log_config. I did that using a variable and retrieving it as in the following log_config.py.

Here is a log_config.py that work:

import os

from airflow import configuration as conf


LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
       's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': S3_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}

Note that this way S3_LOG_FOLDER can be specified in airflow.cfg or as environment the variable AIRFLOW__CORE__S3_LOG_FOLDER.

like image 85
Yannick Widmer Avatar answered Oct 04 '22 03:10

Yannick Widmer