I'd like to know how multiprocessing is done right. Assuming I have a list [1,2,3,4,5]
generated by function f1
which is written to a Queue
(left green circle). Now I start two processes pulling from that queue (by executing f2
in the processes). They process the data, say: doubling the value, and write it to the second queue. Now, function f3
reads this data and prints it out.
Inside the functions there is a kind of a loop, trying to read from the queue forever. How do I stop this process?
Idea 1
f1
does not only send the list, but also a None
object or a custon object, class PipelineTerminator: pass
or some such which is just being propagated all the way down. f3
now waits for None
to come, when it's there, it breaks out of the loop. Problem: it's possible that one of the two f2
s reads and propagates the None
while the other one it still processing a number. Then the last value is lost.
Idea 2
f3
is f1
. So the function f1
generates the data and the pipes, spawns the processes with f2
and feeds all data. After spawning and feeding, it listens on the second pipe, simply counting and processing the received objects. Because it knows how much data fed, it can terminate the processes executing f2
. But if the target is to set up a processing pipeline, the different steps should be separable. So f1
, f2
and f3
are different elements of a pipeline, and the expensive steps are done in parallel.
Idea 3
Each piece of the pipeline is a function, this function spawns processes as it likes to and is responsible to manage them. It knows, how much data came in and how much data has been returned (with yield
maybe). So it's safe to propagate a None
object.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(Instead of using threads, maybe one can think of a smarter solution.)
So ...
I have implemented a solution following idea 2, feeding and waiting for the results to arrive, but it was not really a pipeline with independent functions plugged together. It worked for the task I had to manage, but was hard to maintain.
I'd like to hear from you now how you implement pipelines (easy in one process with generator functions and so on, but with multiple processes?) and manage them usually.
What is a Pipe. In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing.
Multiprocessing is the ability of a system to run multiple processors at one time. If you had a computer with a single processor, it would switch between multiple processes to keep all of them running.
We can return a variable from a process using the multiprocessing. Queue class. A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().
2-Use Cases for Multiprocessing: Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. Show activity on this post. Process may have multiple threads. These threads may share memory and are the units of execution within a process.
With MPipe module, simply do this:
from mpipe import OrderedStage, Pipeline
def f1(value):
return value * 2
def f2(value):
print(value)
s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))
for task in 1, 2, 3, 4, 5, None:
p.put(task)
The above runs 4 processes:
The MPipe cookbook offers some explanation of how processes are shut down internally using None
as the last task.
To run the code, install MPipe:
virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py
Output:
2
4
6
8
10
For Idea 1, how about:
import multiprocessing as mp
sentinel=None
def f2(inq,outq):
while True:
val=inq.get()
if val is sentinel:
break
outq.put(val*2)
def f3(outq):
while True:
val=outq.get()
if val is sentinel:
break
print(val)
def f1():
num_workers=2
inq=mp.Queue()
outq=mp.Queue()
for i in range(5):
inq.put(i)
for i in range(num_workers):
inq.put(sentinel)
workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
printer=mp.Process(target=f3,args=(outq,))
for w in workers:
w.start()
printer.start()
for w in workers:
w.join()
outq.put(sentinel)
printer.join()
if __name__=='__main__':
f1()
The only difference from the description of Idea 1 is that f2
breaks out of the while-loop
when it receives the sentinel (thus terminating itself). f1
blocks until the workers are done (using w.join()
) and then sends f3
the sentinel (signaling that it break out of its while-loop
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With