Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do I need to explicitly pass multiprocessing.Queue instance variables to a child Process executing on an instance method?

I have few basic questions when it comes to using Python's multiprocessing module :

class Someparallelworkerclass(object) :

    def __init__(self):
       self.num_workers = 4
       self.work_queue = multiprocessing.JoinableQueue()
       self.result_queue = multiprocessing.JoinableQueue()

    def someparallellazymethod(self):
       p = multiprocessing.Process(target=self.worktobedone).start()

    def worktobedone(self):
      # get data from work_queue
      # put back result in result queue

Is it necessary to pass work_queue and result_queue as args to Process? Does the answer depends on the OS? The more fundamental question is: does the child process get a copied (COW) address space from the parent process, and hence knows the definition of the class/class method? If yes, how does it know that the queues are to be shared for IPC, and that it shouldn't make duplicates of the work_queue and result_queue in the child process? I tried searching this online but most of the documentation I found was vague, and didn't go into enough details as what exactly is happening underneath.

like image 474
user179156 Avatar asked Oct 06 '14 21:10

user179156


People also ask

Which is the method used to change the default way to create child processes in multiprocessing?

Python provides the ability to create and manage new processes via the multiprocessing. Process class. In multiprocessing programming, we may need to change the technique used to start child processes. This is called the start method.

How does multiprocessing queue work in Python?

A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.

Is multiprocessing queue thread safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

How do I print a multiprocessing queue in Python?

We can print the contents of the multiprocessing using the get() method, empty() method and the print() function. We will check if the multiprocessing queue is empty or not using the empty() method. If the queue is not empty, we will extract an element from the queue using the get() method and print the result.

How to use Python multiprocessing queue to pass data between processes?

After creating the Python multiprocessing queue, you can use it to pass data between two or more processes. There are various multiprocessing queue methods with the help of which we can perform various operations. We can use the put () method to insert an element into the multiprocessing queue.

How to close a multiprocessing queue in Java?

The multiprocessing queue has five elements in it. Therefore, the empty () method returns False. If you want no process should write into a multiprocessing queue, you can close the queue using the close () method. The close () method, when invoked on a multiprocessing queue in any of the processes, closes the queue.

What are pipes and queues in multiprocessing?

Multiprocessing supports Pipes and Queues, which are two types of communication channels between processes. In multiprocessing, when we want to communicate between processes, in that situation Pipes areused. from multiprocessing import Process, Pipe def myfunction(conn): conn.send( ['hi!!

Can objects be enqueued by multiple processes at the same time?

If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.


2 Answers

It's actually not necessary to include the queues in the args argument in this case, no matter what platform you're using. The reason is that even though it doesn't look like you're explicitly passing the two JoinableQueue instances to the child, you actually are - via self. Because self is explicitly being passed to the child, and the two queues are a part of self, they end up being passed along to the child.

On Linux, this happens via os.fork(), which means that file descriptors used by the multiprocessing.connection.Connection objects that the Queue uses internally for inter-process communication are inherited by the child (not copied). Other parts of the Queue become copy-on-write, but that's ok; multiprocessing.Queue is designed so that none of the pieces that need to be copied actually need to stay in sync between the two processes. In fact, many of the internal attributes get reset after the fork occurs:

def _after_fork(self):
    debug('Queue._after_fork()')
    self._notempty = threading.Condition(threading.Lock())
    self._buffer = collections.deque()
    self._thread = None
    self._jointhread = None
    self._joincancelled = False
    self._closed = False
    self._close = None
    self._send = self._writer.send  # _writer is a 
    self._recv = self._reader.recv
    self._poll = self._reader.poll

So that covers Linux. How about Windows? Windows doesn't have fork, so it will need to pickle self to send it to the child, and that includes pickling our Queues. Now, normally if you try to pickle a multiprocessing.Queue, it fails:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
    dict = getstate()
  File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
    assert_spawning(self)
  File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance

But this is actually an artificial limitation. multiprocessing.Queue objects can be pickled in some cases - how else could they be sent to child processes in Windows? And indeed, we can see that if we look at the implementation:

def __getstate__(self):
    assert_spawning(self)
    return (self._maxsize, self._reader, self._writer,
            self._rlock, self._wlock, self._sem, self._opid)

def __setstate__(self, state):
    (self._maxsize, self._reader, self._writer,
     self._rlock, self._wlock, self._sem, self._opid) = state
    self._after_fork()

__getstate__, which is called when pickling an instance, has an assert_spawning call in it, which makes sure we're actually spawning a process while attempting the pickle*. __setstate__, which is called while unpickling, is responsible for calling _after_fork.

So how are the Connection objects used by the queues maintained when we have to pickle? It turns out there's a multiprocessing sub-module that does exactly that - multiprocessing.reduction. The comment at the top of the module states it pretty clearly:

#
# Module to allow connection and socket objects to be transferred
# between processes
#

On Windows, the module ultimately uses the DuplicateHandle API provided by Windows to create a duplicate handle that the child process' Connection object can use. So while each process gets its own handle, they're exact duplicates - any action made on one is reflected on the other:

The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles. For example, if you duplicate a file handle, the current file position is always the same for both handles.

* See this answer for more information about assert_spawning

like image 147
dano Avatar answered Oct 21 '22 22:10

dano


The child process doesn't have the queues in its closure. It's instances of the queues reference different areas of memory. When using queues the way you intend you must pass them as args to the function. one solution I like is to use functools.partial to curry your functions with the queues you want, adding them permanently to its closure and letting you spin up multiple threads to perform the same task with the same IPC channel.

like image 44
ragingSloth Avatar answered Oct 21 '22 23:10

ragingSloth