I have a requirement to log the Apache Airflow logs to stdout in JSON format. Airflow does not seem to project this capability out of the box. I have found a couple python modules that are capable of this task, but I cannot get the implementation to work.
Currently, I am applying a class in airflow/utils/logging.py
to modify the logger, shown below:
from pythonjsonlogger import jsonlogger
class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)
def process_log_record(self, log_record):
if log_record.get('level'):
log_record['severity'] = log_record['level']
del log_record['level']
else:
log_record['severity'] = log_record['levelname']
del log_record['levelname']
if log_record.get('asctime'):
log_record['timestamp'] = log_record['asctime']
del log_record['asctime']
now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['nanotime'] = now
return super(StackdriverJsonFormatter, self).process_log_record(log_record)
I am implementing this code in /airflow/settings.py
as shown below:
from airflow.utils import logging as logconf
def configure_logging(log_format=LOG_FORMAT):
handler = logconf.logging.StreamHandler(sys.stdout)
formatter = logconf.StackdriverJsonFormatter()
handler.setFormatter(formatter)
logging = logconf.logging.getLogger()
logging.addHandler(handler)
''' code below was original airflow source code
logging.root.handlers = []
logging.basicConfig(
format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''
I have tried a couple different variations of this and can't get the python-json-logger to transform the logs to JSON. Perhaps I'm not getting to the root logger? Another option I have considered is manually formatting the logs to a JSON string. No luck with that yet either. Any alternative ideas, tips, or support are appreciated.
Cheers!
By default, log file names have the following format: For standard tasks: dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}. log. For dynamically mapped tasks: dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.
Users can specify the directory to place log files in airflow. cfg using base_log_folder . By default, logs are placed in the AIRFLOW_HOME directory.
You can also view the logs in the Airflow web interface. Streaming logs: These logs are a superset of the logs in Airflow. To access streaming logs, you can go to the logs tab of Environment details page in Google Cloud console, use the Cloud Logging, or use Cloud Monitoring. Logging and Monitoring quotas apply.
Airflow can be configured to read task logs from Elasticsearch and optionally write logs to stdout in standard or json format. These logs can later be collected and forwarded to the Elasticsearch cluster using tools like fluentd, logstash or others.
I don't know if you ever solved this problem, but after some frustrating tinkering, I ended up getting this to play nice with airflow. For reference, I followed a lot of this article to get it working: https://www.astronomer.io/guides/logging/. The main issue was that the airflow logging only accepts a string template for the logging format, which json-logging can't plug into. So you have to create your own logging classes and connect it to a custom logging config class.
Copy the log template here into your $AIRFLOW_HOME/config
folder, and change DEFAULT_CONFIG_LOGGING
to CONFIG_LOGGING
. When you're successful, bring up airflow and you'll get a log message on airflow startup that says Successfully imported user-defined logging config from logging_config.LOGGING_CONFIG
. If this is the first .py file in the config folder don't forget to add a blank __init__.py
file to get python to pick it up
Write your custom JsonFormatter to inject into your handler. I did mine off of this one.
Write the custom log handler classes. Since I was looking for JSON logging, mine look like this:
from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger
class JsonStreamHandler(RedirectStdHandler):
def __init__(self, stream):
super(JsonStreamHandler, self).__init__(stream)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileTaskHandler(FileTaskHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileProcessorHandler(FileProcessorHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonRotatingFileHandler(RotatingFileHandler):
def __init__(self, filename, mode, maxBytes, backupCount):
super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
'handlers': {
'console': {
'class': 'logging_handler.JsonStreamHandler',
'stream': 'sys.stdout'
},
'task': {
'class': 'logging_handler.JsonFileTaskHandler',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'logging_handler.JsonFileProcessorHandler',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
}
...
and
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
'processor_manager': {
'class': 'logging_handler.JsonRotatingFileHandler',
'formatter': 'airflow',
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode': 'a',
'maxBytes': 104857600, # 100MB
'backupCount': 5
}
}
...
And json logs should be output, both in the DAG logs and the output as well.
Hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With