Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How should I log while using multiprocessing in Python?

People also ask

Does logging work with multiprocessing?

The multiprocessing module has its own logger with the name “multiprocessing“. This logger is used within objects and functions within the multiprocessing module to log messages, such as debug messages that processes are running or have shutdown. We can get this logger and use it for logging.

Is Python logging multiprocessing safe?

As Matino correctly explained: logging in a multiprocessing setup is not safe, as multiple processes (who do not know anything about the other ones existing) are writing into the same file, potentially intervening with each other.

How does Python handle multiprocessing?

Multi-Processing in Python using Pool class- It is almost similar to the map-reduce architecture- in essence, it maps the input to different processors and collects the output from all processors as a list. The processes in execution are stored in memory and other non-executing processes are stored out of memory.

How lock in multiprocessing Python?

Python provides a mutual exclusion lock for use with processes via the multiprocessing. Lock class. An instance of the lock can be created and then acquired by processes before accessing a critical section, and released after the critical section. Only one process can have the lock at any time.


I just now wrote a log handler of my own that just feeds everything to the parent process via a pipe. I've only been testing it for ten minutes but it seems to work pretty well.

(Note: This is hardcoded to RotatingFileHandler, which is my own use case.)


Update: @javier now maintains this approach as a package available on Pypi - see multiprocessing-logging on Pypi, github at https://github.com/jruere/multiprocessing-logging


Update: Implementation!

This now uses a queue for correct handling of concurrency, and also recovers from errors correctly. I've now been using this in production for several months, and the current version below works without issue.

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

The only way to deal with this non-intrusively is to:

  1. Spawn each worker process such that its log goes to a different file descriptor (to disk or to pipe.) Ideally, all log entries should be timestamped.
  2. Your controller process can then do one of the following:
    • If using disk files: Coalesce the log files at the end of the run, sorted by timestamp
    • If using pipes (recommended): Coalesce log entries on-the-fly from all pipes, into a central log file. (E.g., Periodically select from the pipes' file descriptors, perform merge-sort on the available log entries, and flush to centralized log. Repeat.)

QueueHandler is native in Python 3.2+, and does exactly this. It is easily replicated in previous versions.

Python docs have two complete examples: Logging to a single file from multiple processes

For those using Python < 3.2, just copy QueueHandler into your own code from: https://gist.github.com/vsajip/591589 or alternatively import logutils.

Each process (including the parent process) puts its logging on the Queue, and then a listener thread or process (one example is provided for each) picks those up and writes them all to a file - no risk of corruption or garbling.


Below is another solution with a focus on simplicity for anyone else (like me) who get here from Google. Logging should be easy! Only for 3.2 or higher.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()