Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing.Queue modifies objects

I have an application that implements something like a Chain of Responsibility in Python. There is one process that passes objects via multiprocessing.Queue() to other processes which then perform actions on the objects. It is also important for the last modified time of the object which is passed to be tracked, so action can be taken only when the object was modified.

The problem I am experiencing is that the _modified attribute in the object appears to change randomly after extracting it from the queue. However, the _mtime attribute is always correct. The example below will run and (intentionally) randomly modify the DummyObject, then place it on the Queue for each of the handler processes. Each handler will then print the _modified and _mtime values that they received in the object. I expect the _modified value to be the same in both the command_func and the handler functions, however that is usually not the case. If I remove the Object_w_mtime inheritance from the DummyObject, then I do not see any differences in the sent and received objects.

I'm relatively new to python. To the best of my knowledge what should be happening is each time an object is placed on a queue, it is pickled, then sent over a pipe to the receiving process which unpickles the object. Is that correct? Is there any way that the object inheritance would be messed up when the object is pickled/unpickled?

I tested this with Python 2.7.2 and 2.6.7 on Ubuntu 11.10, as well as python 2.7.1 on Ubuntu 11.04. Sometimes you have to let it run for a minute or so to see the behavior, as it appears to be random.

Grasping at straws here, thanks in advance.

import multiprocessing
import time
import traceback
import os
import random

class Object_w_mtime(object):
    '''
    Parent object that tracks the last time an attribute was modified
    '''
    def __setattr__(self,a_name,a_value):
        if ((a_name not in ('_mtime','_modified')) and
            (a_value != getattr(self,a_name,None))
        ):
            object.__setattr__(self, '_modified', True)
            object.__setattr__(self, '_mtime', time.time())
        object.__setattr__(self, a_name, a_value)
        return True
    #END def

    def reset(self):
        self._modified = False
#END class

class DummyObject(Object_w_mtime):
    def __init__(self):
        self.value = 10

def handler(in_queue = None, handler_id = None):
    print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id)
    while True:
        try:
            obj = in_queue.get(True,61)
            print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime
        except multiprocessing.queues.Empty:
            break
        except KeyboardInterrupt:
            break
        except Exception as e:
            print traceback.format_exc()
            break
    return True
#END def

def command_func(next_links = None):
    print 'PID:' + str(os.getpid()) + ':command_func:<RUN>'
    obj = DummyObject()
    while True:
        try:
            # randomly assign a different value to test with a modified and unmodified object
            obj.value = random.randint(0,1)
            print '**************** obj.value = {0} ***************'.format(obj.value)
            print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime
            for each in next_links:
                each.put(obj,False)
        except multiprocessing.queues.Empty:
            break
        except KeyboardInterrupt:
            break
        except Exception as e:
            print e
            print traceback.format_exc()
            break
        obj.reset()
        time.sleep(3)
    return True
#END def


if __name__ == '__main__':
    handler_queues = list()
    handler_processes = list()
    # Create a queue and process object for each command handler
    for handler_id in range(1,4):
        queue = multiprocessing.Queue()
        process = multiprocessing.Process(target=handler, args=(queue, handler_id))
        handler_queues.append(queue)
        handler_processes.append(process)

    try:
        # spawn handler processes
        for process in handler_processes:
            process.start()
        # Start sending commands to handlers
        command_func(handler_queues)

    # exit on keyboard interrupt
    except KeyboardInterrupt:
        for process in handler_processes:
            process.join()
    except Exception:
        traceback.print_exc()
like image 524
Jason Martens Avatar asked Jan 24 '12 19:01

Jason Martens


People also ask

How does Python multiprocessing queue work?

Python Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.

Is Python multiprocessing queue thread safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

Does Python multiprocessing use multiple cores?

Key Takeaways. Python is NOT a single-threaded language. Python processes typically use a single thread because of the GIL. Despite the GIL, libraries that perform computationally heavy tasks like numpy, scipy and pytorch utilise C-based implementations under the hood, allowing the use of multiple cores.


1 Answers

In short, you modify obj after putting it on the queue.

Looking at http://svn.python.org/view/python/trunk/Lib/multiprocessing/queues.py?revision=76434&view=markup line 285, put() merely places the object in an internal queue, and if not already running, launches a background thread to process objects from that queue. Thus there is a race between each.put(obj,False) and obj.reset() in your code.

You should probably only use Queues with immutable (copies of) objects.

like image 118
Tuure Laurinolli Avatar answered Oct 02 '22 10:10

Tuure Laurinolli