I have few basic questions when it comes to using Python's multiprocessing
module :
class Someparallelworkerclass(object) :
def __init__(self):
self.num_workers = 4
self.work_queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.JoinableQueue()
def someparallellazymethod(self):
p = multiprocessing.Process(target=self.worktobedone).start()
def worktobedone(self):
# get data from work_queue
# put back result in result queue
Is it necessary to pass work_queue
and result_queue
as args
to Process
? Does the answer depends on the OS? The more fundamental question is: does the child process get a copied (COW) address space from the parent process, and hence knows the definition of the class/class method? If yes, how does it know that the queues are to be shared for IPC, and that it shouldn't make duplicates of the work_queue
and result_queue
in the child process? I tried searching this online but most of the documentation I found was vague, and didn't go into enough details as what exactly is happening underneath.
Python provides the ability to create and manage new processes via the multiprocessing. Process class. In multiprocessing programming, we may need to change the technique used to start child processes. This is called the start method.
A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get(). The multiprocessing. Queue provides a first-in, first-out FIFO queue, which means that the items are retrieved from the queue in the order they were added.
Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.
We can print the contents of the multiprocessing using the get() method, empty() method and the print() function. We will check if the multiprocessing queue is empty or not using the empty() method. If the queue is not empty, we will extract an element from the queue using the get() method and print the result.
After creating the Python multiprocessing queue, you can use it to pass data between two or more processes. There are various multiprocessing queue methods with the help of which we can perform various operations. We can use the put () method to insert an element into the multiprocessing queue.
The multiprocessing queue has five elements in it. Therefore, the empty () method returns False. If you want no process should write into a multiprocessing queue, you can close the queue using the close () method. The close () method, when invoked on a multiprocessing queue in any of the processes, closes the queue.
Multiprocessing supports Pipes and Queues, which are two types of communication channels between processes. In multiprocessing, when we want to communicate between processes, in that situation Pipes areused. from multiprocessing import Process, Pipe def myfunction(conn): conn.send( ['hi!!
If multiple processes are enqueuing objects, it is possible for the objects to be received at the other end out-of-order. However, objects enqueued by the same process will always be in the expected order with respect to each other.
It's actually not necessary to include the queues in the args
argument in this case, no matter what platform you're using. The reason is that even though it doesn't look like you're explicitly passing the two JoinableQueue
instances to the child, you actually are - via self
. Because self
is explicitly being passed to the child, and the two queues are a part of self
, they end up being passed along to the child.
On Linux, this happens via os.fork()
, which means that file descriptors used by the multiprocessing.connection.Connection
objects that the Queue
uses internally for inter-process communication are inherited by the child (not copied). Other parts of the Queue
become copy-on-write
, but that's ok; multiprocessing.Queue
is designed so that none of the pieces that need to be copied actually need to stay in sync between the two processes. In fact, many of the internal attributes get reset after the fork
occurs:
def _after_fork(self):
debug('Queue._after_fork()')
self._notempty = threading.Condition(threading.Lock())
self._buffer = collections.deque()
self._thread = None
self._jointhread = None
self._joincancelled = False
self._closed = False
self._close = None
self._send = self._writer.send # _writer is a
self._recv = self._reader.recv
self._poll = self._reader.poll
So that covers Linux. How about Windows? Windows doesn't have fork
, so it will need to pickle self
to send it to the child, and that includes pickling our Queues
. Now, normally if you try to pickle a multiprocessing.Queue
, it fails:
>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> import pickle
>>> pickle.dumps(q)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 84, in _reduce_ex
dict = getstate()
File "/usr/local/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/usr/local/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
But this is actually an artificial limitation. multiprocessing.Queue
objects can be pickled in some cases - how else could they be sent to child processes in Windows? And indeed, we can see that if we look at the implementation:
def __getstate__(self):
assert_spawning(self)
return (self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def __setstate__(self, state):
(self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._after_fork()
__getstate__
, which is called when pickling an instance, has an assert_spawning
call in it, which makes sure we're actually spawning a process while attempting the pickle*. __setstate__
, which is called while unpickling, is responsible for calling _after_fork
.
So how are the Connection
objects used by the queues maintained when we have to pickle? It turns out there's a multiprocessing
sub-module that does exactly that - multiprocessing.reduction
. The comment at the top of the module states it pretty clearly:
#
# Module to allow connection and socket objects to be transferred
# between processes
#
On Windows, the module ultimately uses the DuplicateHandle API provided by Windows to create a duplicate handle that the child process' Connection
object can use. So while each process gets its own handle, they're exact duplicates - any action made on one is reflected on the other:
The duplicate handle refers to the same object as the original handle. Therefore, any changes to the object are reflected through both handles. For example, if you duplicate a file handle, the current file position is always the same for both handles.
* See this answer for more information about assert_spawning
The child process doesn't have the queues in its closure. It's instances of the queues reference different areas of memory. When using queues the way you intend you must pass them as args to the function. one solution I like is to use functools.partial
to curry your functions with the queues you want, adding them permanently to its closure and letting you spin up multiple threads to perform the same task with the same IPC channel.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With