Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Logging with multiprocessing madness

Im trying to use python's default logging module in a multiprocessing scenario. I've read:

  1. Python MultiProcess, Logging, Various Classes
  2. Logging using multiprocessing

and other multiple posts about multiprocessing, logging, python classes and such. After all this reading I've came to this piece of code I cannot make it properly run which uses python's logutils QueueHandler:

import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler


class Worker(Process):

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        self.qh = QueueHandler(self.queue)
        self.root = logging.getLogger()
        self.root.addHandler(self.qh)
        self.root.setLevel(logging.DEBUG)        
        self.logger = logging.getLogger("W%i"%self.n)


    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


def listener_process(queue):
    while True:
        try:
            record = queue.get()
            if record is None:
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            import sys, traceback
            print >> sys.stderr, 'Whoops! Problem:'
            traceback.print_exc(file=sys.stderr)

if __name__ == "__main__":

    mpq = mpQueue(-1)

    root = logging.getLogger()
    h = logging.StreamHandler()    
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s     %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

    l = logging.getLogger("Test")
    l.setLevel(logging.DEBUG)

    listener = Process(target=listener_process,
                       args=(mpq,))
    listener.start()
    workers=[]
    for i in xrange(1):
        worker = Worker(i, mpq)
        worker.daemon = True
        worker.start()
        workers.append(worker)

    for worker in workers:
        worker.join()

    mpq.put_nowait(None)
    listener.join()

    for i in xrange(10):
        l.info("testing %i"%i)

    print "Finish"

If the code is executed, the output somehow repeats lines like:

2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 9
Finish

In other questios it's suggested that the handler gets added more than once, but, as you can see, I only add the streamhanlder once in the main method. I've already tested embedding the main method into a class with the same result.

EDIT: as @max suggested (or what I believe he said) I've modified the code of the worker class as:

class Worker(Process):

    root = logging.getLogger()
    qh = None

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        if not self.qh:
            Worker.qh = QueueHandler(self.queue)            
            Worker.root.addHandler(self.qh)
            Worker.root.setLevel(logging.DEBUG)

        self.logger = logging.getLogger("W%i"%self.n)

        print self.root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)

With the same results, Now the queue handler is not added again and again but still there are duplicate log entries, even with just one worker.

EDIT2: I've changed the code a little bit. I changed the listener process and now use a QueueListener (that's what I intended in the begining anyway), moved the main code to a class.

import sys

import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler

root = logging.getLogger()
added_qh = False

class Worker(Process):

    def __init__(self, logconf, n, qh):
        super(Worker, self).__init__()
        self.n = n
        self.logconf = logconf

#        global root
        global added_qh

        if not added_qh:
            added_qh = True
            root.addHandler(qh)
            root.setLevel(logging.DEBUG)            

        self.logger = logging.getLogger("W%i"%self.n)

        #print root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


class Main(object):

    def __init__(self):
        pass

    def start(self):

        mpq = mpQueue(-1)
        qh = QueueHandler(mpq)

        h = logging.StreamHandler()

        ql = QueueListener(mpq, h)

        #h.setFormatter(f)
        root.addHandler(qh)

        l = logging.getLogger("Test")
        l.setLevel(logging.DEBUG)

        workers=[]

        for i in xrange(15):
            worker = Worker(logconf, i, qh)
            worker.daemon = True
            worker.start()
            workers.append(worker)

        for worker in workers:
            print "joining worker: {}".format(worker)
            worker.join()

        mpq.put_nowait(None)

        ql.start()

        # listener.join()

        for i in xrange(10):
            l.info("testing %i"%i)

if __name__ == "__main__":


    x = Main()
    x.start()

    time.sleep(10)

    print "Finish"

Now it mostly works until I reach a certain number of workers (~15) when for some reason the Main class get blocked in de join and the rest of the workers do nothing.

like image 535
Tritt Avatar asked Dec 02 '13 15:12

Tritt


People also ask

Does logging work with multiprocessing?

The multiprocessing module has its own logger with the name “multiprocessing“. This logger is used within objects and functions within the multiprocessing module to log messages, such as debug messages that processes are running or have shutdown. We can get this logger and use it for logging.

How should I log while using multiprocessing in Python?

get_logger() is the logger used by multiprocessing module itself. It is useful if you want to debug a multiprocessing issue.

Is Python logging multiprocess safe?

Multiprocessing with logging module — QueueHandlerAlthough logging module is thread-safe, it's not process-safe. If you want multiple processes to write to the same log file, then you have to manually take care of the access to your file.


2 Answers

I'm coming late, so you probably don't need the answer anymore. The problem comes from the fact that you already have a handler set in your main process, and in your worker you are adding another one. This means that in your worker process, two handlers are in fact managing your data, the one in pushing the log to queue, and the one writing to the stream.

You can fix this simply by adding an extra line self.root.handlers = [] to your code. From your original code, the __init__ method of the worker would look like this:

def __init__(self, n, q):
    super(Worker, self).__init__()
    self.n = n
    self.queue = q

    self.qh = QueueHandler(self.queue)
    self.root = logging.getLogger()
    self.root.handlers = []
    self.root.addHandler(self.qh)
    self.root.setLevel(logging.DEBUG)
    self.logger = logging.getLogger("W%i"%self.n)

The output now looks like this:

python workers.py 
2016-05-12 10:07:02,971 Worker-2   W0 INFO         Worker 0 Starting
2016-05-12 10:07:02,972 Worker-2   W0 INFO         testing 0
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 1
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 2
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 3
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 4
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 5
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 6
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 7
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 8
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 9
2016-05-12 10:07:02,973 Worker-2   W0 INFO         Completed 0
Finish
like image 185
Gerard Yin Avatar answered Sep 20 '22 21:09

Gerard Yin


I figured out a pretty simple workaround using monkeypatching. It probably isn't robust and I am not an expert with the logging module, but it seemed like the best solution for my situation. After trying some code-changes (to enable passing in an existing logger, from multiprocess.get_logger()) I didn't like how much the code was changing and came up with a quick (well it would have been, had I done this in the first place) easy to read hack/workaround:

(working example, complete with multiprocessing pool)

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

Of course this is probably not going to cover the whole gamut of logging usage, but I think the concept is simple enough here to get working quickly and relatively painlessly. And it should be easy to modify (for example the lambda func discards a possible prefix that can be passed into getLogger).

like image 24
nmz787 Avatar answered Sep 22 '22 21:09

nmz787