Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Format Airflow Logs in JSON

I have a requirement to log the Apache Airflow logs to stdout in JSON format. Airflow does not seem to project this capability out of the box. I have found a couple python modules that are capable of this task, but I cannot get the implementation to work.

Currently, I am applying a class in airflow/utils/logging.py to modify the logger, shown below:

from pythonjsonlogger import jsonlogger

class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
    jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)

def process_log_record(self, log_record):
    if log_record.get('level'):
        log_record['severity'] = log_record['level']
        del log_record['level']
    else: 
        log_record['severity'] = log_record['levelname']
        del log_record['levelname']
    if log_record.get('asctime'):
        log_record['timestamp'] = log_record['asctime']
        del log_record['asctime']
    now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
    log_record['nanotime'] = now
    return super(StackdriverJsonFormatter, self).process_log_record(log_record)

I am implementing this code in /airflow/settings.py as shown below:

from airflow.utils import logging as logconf

def configure_logging(log_format=LOG_FORMAT):
     handler = logconf.logging.StreamHandler(sys.stdout)
     formatter = logconf.StackdriverJsonFormatter()
     handler.setFormatter(formatter)
     logging = logconf.logging.getLogger()
     logging.addHandler(handler)
''' code below was original airflow source code
     logging.root.handlers = []
     logging.basicConfig(
         format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''

I have tried a couple different variations of this and can't get the python-json-logger to transform the logs to JSON. Perhaps I'm not getting to the root logger? Another option I have considered is manually formatting the logs to a JSON string. No luck with that yet either. Any alternative ideas, tips, or support are appreciated.

Cheers!

like image 407
Matthew Bennett Avatar asked Oct 26 '18 21:10

Matthew Bennett


People also ask

How do you write logs in Airflow?

By default, log file names have the following format: For standard tasks: dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}. log. For dynamically mapped tasks: dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.

Where are Airflow logs stored?

Users can specify the directory to place log files in airflow. cfg using base_log_folder . By default, logs are placed in the AIRFLOW_HOME directory.

How do you read Airflow logs?

You can also view the logs in the Airflow web interface. Streaming logs: These logs are a superset of the logs in Airflow. To access streaming logs, you can go to the logs tab of Environment details page in Google Cloud console, use the Cloud Logging, or use Cloud Monitoring. Logging and Monitoring quotas apply.

How do you write an Elasticsearch log?

Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.


1 Answers

I don't know if you ever solved this problem, but after some frustrating tinkering, I ended up getting this to play nice with airflow. For reference, I followed a lot of this article to get it working: https://www.astronomer.io/guides/logging/. The main issue was that the airflow logging only accepts a string template for the logging format, which json-logging can't plug into. So you have to create your own logging classes and connect it to a custom logging config class.

  1. Copy the log template here into your $AIRFLOW_HOME/config folder, and change DEFAULT_CONFIG_LOGGING to CONFIG_LOGGING. When you're successful, bring up airflow and you'll get a log message on airflow startup that says Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG. If this is the first .py file in the config folder don't forget to add a blank __init__.py file to get python to pick it up

  2. Write your custom JsonFormatter to inject into your handler. I did mine off of this one.

  3. Write the custom log handler classes. Since I was looking for JSON logging, mine look like this:

from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger

class JsonStreamHandler(RedirectStdHandler):
    def __init__(self, stream):
        super(JsonStreamHandler, self).__init__(stream)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileTaskHandler(FileTaskHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonFileProcessorHandler(FileProcessorHandler):
    def __init__(self, base_log_folder, filename_template):
        super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)


class JsonRotatingFileHandler(RotatingFileHandler):
    def __init__(self, filename, mode, maxBytes, backupCount):
        super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
        json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
        self.setFormatter(json_formatter)
  1. Hook them up to the logging configs in your custom logging_config.py file.
'handlers': {
    'console': {
        'class': 'logging_handler.JsonStreamHandler',
        'stream': 'sys.stdout'
    },
    'task': {
        'class': 'logging_handler.JsonFileTaskHandler',
        'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
        'filename_template': FILENAME_TEMPLATE,
    },
    'processor': {
        'class': 'logging_handler.JsonFileProcessorHandler',
        'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
        'filename_template': PROCESSOR_FILENAME_TEMPLATE,
    }
}
...

and

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
    'handlers': {
        'processor_manager': {
            'class': 'logging_handler.JsonRotatingFileHandler',
            'formatter': 'airflow',
            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            'mode': 'a',
            'maxBytes': 104857600,  # 100MB
            'backupCount': 5
        }
    }
...

And json logs should be output, both in the DAG logs and the output as well.

Hope this helps!

like image 149
Ivan Peng Avatar answered Oct 06 '22 00:10

Ivan Peng