Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I log multiple very similar events gracefully in python?

With pythons logging module, is there a way to collect multiple events into one log entry? An ideal solution would be an extension of python's logging module or a custom formatter/filter for it so collecting logging events of the same kind happens in the background and nothing needs to be added in code body (e.g. at every call of a logging function).

Here an example that generates a large number of the same or very similar logging events:

import logging

for i in range(99999): 
    try:
        asdf[i]   # not defined!
    except NameError:
        logging.exception('foo') # generates large number of logging events
    else: pass

# ... more code with more logging ...

for i in range(88888): logging.info('more of the same %d' % i)

# ... and so on ...

So we have the same exception 99999 times and log it. It would be nice, if the log just said something like:

ERROR:root:foo (occured 99999 times)
Traceback (most recent call last):
  File "./exceptionlogging.py", line 10, in <module>
    asdf[i]   # not defined!
NameError: name 'asdf' is not defined

INFO:root:foo more of the same (occured 88888 times with various values)
like image 585
con-f-use Avatar asked Dec 04 '15 14:12

con-f-use


Video Answer


2 Answers

You should probably be writing a message aggregate/statistics class rather than trying to hook onto the logging system's singletons but I guess you may have an existing code base that uses logging.

I'd also suggest that you should instantiate your loggers rather than always using the default root. The Python Logging Cookbook has extensive explanation and examples.

The following class should do what you are asking.

import logging
import atexit
import pprint

class Aggregator(object):
    logs = {}

    @classmethod
    def _aggregate(cls, record):
        id = '{0[levelname]}:{0[name]}:{0[msg]}'.format(record.__dict__)
        if id not in cls.logs: # first occurrence
            cls.logs[id] = [1, record]
        else: # subsequent occurrence
            cls.logs[id][0] += 1

    @classmethod
    def _output(cls):
        for count, record in cls.logs.values():
            record.__dict__['msg'] += ' (occured {} times)'.format(count)
            logging.getLogger(record.__dict__['name']).handle(record)

    @staticmethod
    def filter(record):
        # pprint.pprint(record)
        Aggregator._aggregate(record)
        return False

    @staticmethod
    def exit():
        Aggregator._output()



logging.getLogger().addFilter(Aggregator)
atexit.register(Aggregator.exit)

for i in range(99999): 
    try:
        asdf[i]   # not defined!
    except NameError:
        logging.exception('foo') # generates large number of logging events
    else: pass

# ... more code with more logging ...
for i in range(88888): logging.error('more of the same')

# ... and so on ...    

Note that you don't get any logs until the program exits.

The result of running it this is:

    ERROR:root:foo (occured 99999 times)
    Traceback (most recent call last):
      File "C:\work\VEMS\python\logcount.py", line 38, in 
        asdf[i]   # not defined!
    NameError: name 'asdf' is not defined
    ERROR:root:more of the same (occured 88888 times)
like image 85
Mike Robins Avatar answered Oct 20 '22 00:10

Mike Robins


Your question hides a subliminal assumption of how "very similar" is defined. Log records can either be const-only (whose instances are strictly identical), or a mix of consts and variables (no consts at all is also considered a mix).

An aggregator for const-only log records is a piece of cake. You just need to decide whether process/thread will fork your aggregation or not. For log records which include both consts and variables you'll need to decide whether to split your aggregation based on the variables you have in your record.

A dictionary-style counter (from collections import Counter) can serve as a cache, which will count your instances in O(1), but you may need some higher-level structure in order to write the variables down if you wish. Additionally, you'll have to manually handle the writing of the cache into a file - every X seconds (binning) or once the program has exited (risky - you may lose all in-memory data if something gets stuck).

A framework for aggregation would look something like this (tested on Python v3.4):

from logging import Handler
from threading import RLock, Timer
from collections import defaultdict


class LogAggregatorHandler(Handler):

    _default_flush_timer = 300  # Number of seconds between flushes
    _default_separator = "\t"  # Seperator char between metadata strings
    _default_metadata = ["filename", "name", "funcName", "lineno", "levelname"]  # metadata defining unique log records

    class LogAggregatorCache(object):
        """ Keeps whatever is interesting in log records aggregation. """
        def __init__(self, record=None):
            self.message = None
            self.counter = 0
            self.timestamp = list()
            self.args = list()
            if record is not None:
                self.cache(record)

        def cache(self, record):
            if self.message is None:  # Only the first message is kept
                self.message = record.msg
            assert self.message == record.msg, "Non-matching log record"  # note: will not work with string formatting for log records; e.g. "blah {}".format(i)
            self.timestamp.append(record.created)
            self.args.append(record.args)
            self.counter += 1

        def __str__(self):
            """ The string of this object is used as the default output of log records aggregation. For example: record message with occurrences. """
            return self.message + "\t (occurred {} times)".format(self.counter)

    def __init__(self, flush_timer=None, separator=None, add_process_thread=False):
        """
        Log record metadata will be concatenated to a unique string, separated by self._separator.
        Process and thread IDs will be added to the metadata if set to True; otherwise log records across processes/threads will be aggregated together.
        :param separator: str
        :param add_process_thread: bool
        """
        super().__init__()
        self._flush_timer = flush_timer or self._default_flush_timer
        self._cache = self.cache_factory()
        self._separator = separator or self._default_separator
        self._metadata = self._default_metadata
        if add_process_thread is True:
            self._metadata += ["process", "thread"]
        self._aggregation_lock = RLock()
        self._store_aggregation_timer = self.flush_timer_factory()
        self._store_aggregation_timer.start()

        # Demo logger which outputs aggregations through a StreamHandler:
        self.agg_log = logging.getLogger("aggregation_logger")
        self.agg_log.addHandler(logging.StreamHandler())
        self.agg_log.setLevel(logging.DEBUG)
        self.agg_log.propagate = False

    def cache_factory(self):
        """ Returns an instance of a new caching object. """
        return defaultdict(self.LogAggregatorCache)

    def flush_timer_factory(self):
        """ Returns a threading.Timer daemon object which flushes the Handler aggregations. """
        timer = Timer(self._flush_timer, self.flush)
        timer.daemon = True
        return timer

    def find_unique(self, record):
        """ Extracts a unique metadata string from log records. """
        metadata = ""
        for single_metadata in self._metadata:
            value = getattr(record, single_metadata, "missing " + str(single_metadata))
            metadata += str(value) + self._separator
        return metadata[:-len(self._separator)]

    def emit(self, record):
        try:
            with self._aggregation_lock:
                metadata = self.find_unique(record)
                self._cache[metadata].cache(record)
        except Exception:
            self.handleError(record)

    def flush(self):
        self.store_aggregation()

    def store_aggregation(self):
        """ Write the aggregation data to file. """
        self._store_aggregation_timer.cancel()
        del self._store_aggregation_timer
        with self._aggregation_lock:
            temp_aggregation = self._cache
            self._cache = self.cache_factory()

        # ---> handle temp_aggregation and write to file <--- #
        for key, value in sorted(temp_aggregation.items()):
            self.agg_log.info("{}\t{}".format(key, value))

        # ---> re-create the store_aggregation Timer object <--- #
        self._store_aggregation_timer = self.flush_timer_factory()
        self._store_aggregation_timer.start()

Testing this Handler class with random log severity in a for-loop:

if __name__ == "__main__":
    import random
    import logging

    logger = logging.getLogger()
    handler = LogAggregatorHandler()
    logger.addHandler(handler)
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)

    logger.info("entering logging loop")

    for i in range(25):
        # Randomly choose log severity:
        severity = random.choice([logging.DEBUG, logging.INFO, logging.WARN, logging.ERROR, logging.CRITICAL])
        logger.log(severity, "test message number %s", i)

    logger.info("end of test code")

If you want to add more stuff, this is what a Python log record looks like:

{'args': ['()'],
 'created': ['1413747902.18'],
 'exc_info': ['None'],
 'exc_text': ['None'],
 'filename': ['push_socket_log.py'],
 'funcName': ['<module>'],
 'levelname': ['DEBUG'],
 'levelno': ['10'],
 'lineno': ['17'],
 'module': ['push_socket_log'],
 'msecs': ['181.387901306'],
 'msg': ['Test message.'],
 'name': ['__main__'],
 'pathname': ['./push_socket_log.py'],
 'process': ['65486'],
 'processName': ['MainProcess'],
 'relativeCreated': ['12.6709938049'],
 'thread': ['140735262810896'],
 'threadName': ['MainThread']}

One more thing to think about: Most features you run depend on a flow of several consecutive commands (which will ideally report log records accordingly); e.g. a client-server communication will typically depend on receiving a request, processing it, reading some data from the DB (which requires a connection and some read commands), some kind of parsing/processing, constructing the response packet and reporting the response code.

This highlights one of the main disadvantages of using an aggregation approach: by aggregating log records you lose track of the time and order of the actions that took place. It will be extremely difficult to figure out what request was incorrectly structured if you only have the aggregation at hand. My advice in this case is that you keep both the raw data and the aggregation (using two file handlers or something similar), so that you can investigate a macro-level (aggregation) and a micro-level (normal logging).

However, you are still left with the responsibility of finding out that things have gone wrong, and then manually investe what caused it. When developing on your PC this is an easy enough task; but deploying your code in several production servers makes these tasks cumbersome, wasting a lot of your time. Accordingly, there are several companies developing products specifically for log management. Most aggregate similar log records together, but others incorporate machine learning algorithms for automatic aggregation and learning your software's behavior. Outsourcing your log handling can then enable you to focus on your product, instead of on your bugs.

Disclaimer: I work for Coralogix, one such solution.

like image 40
redlus Avatar answered Oct 20 '22 01:10

redlus