Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiprocessing in a pipeline done right

I'd like to know how multiprocessing is done right. Assuming I have a list [1,2,3,4,5] generated by function f1 which is written to a Queue (left green circle). Now I start two processes pulling from that queue (by executing f2 in the processes). They process the data, say: doubling the value, and write it to the second queue. Now, function f3 reads this data and prints it out.

layout of the data flow

Inside the functions there is a kind of a loop, trying to read from the queue forever. How do I stop this process?

Idea 1

f1 does not only send the list, but also a None object or a custon object, class PipelineTerminator: pass or some such which is just being propagated all the way down. f3 now waits for None to come, when it's there, it breaks out of the loop. Problem: it's possible that one of the two f2s reads and propagates the None while the other one it still processing a number. Then the last value is lost.

Idea 2

f3 is f1. So the function f1 generates the data and the pipes, spawns the processes with f2 and feeds all data. After spawning and feeding, it listens on the second pipe, simply counting and processing the received objects. Because it knows how much data fed, it can terminate the processes executing f2. But if the target is to set up a processing pipeline, the different steps should be separable. So f1, f2 and f3 are different elements of a pipeline, and the expensive steps are done in parallel.

Idea 3

pipeline idea 3

Each piece of the pipeline is a function, this function spawns processes as it likes to and is responsible to manage them. It knows, how much data came in and how much data has been returned (with yield maybe). So it's safe to propagate a None object.

setup child processes 

execute thread one and two and wait until both finished

thread 1:
    while True:
        pull from input queue
        if None: break and set finished_flag
        else: push to queue1 and increment counter1

thread 2:
    while True:
        pull from queue2
        increment counter2
        yield result
        if counter1 == counter2 and finished_flag: break

when both threads finished: kill process pool and return.

(Instead of using threads, maybe one can think of a smarter solution.)

So ...

I have implemented a solution following idea 2, feeding and waiting for the results to arrive, but it was not really a pipeline with independent functions plugged together. It worked for the task I had to manage, but was hard to maintain.

I'd like to hear from you now how you implement pipelines (easy in one process with generator functions and so on, but with multiple processes?) and manage them usually.

like image 643
wal-o-mat Avatar asked Nov 26 '11 10:11

wal-o-mat


People also ask

What is pipe in multiprocessing?

What is a Pipe. In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing.

What does multiprocessing process do?

Multiprocessing is the ability of a system to run multiple processors at one time. If you had a computer with a single processor, it would switch between multiple processes to keep all of them running.

What does multiprocessing process return?

We can return a variable from a process using the multiprocessing. Queue class. A queue is a data structure on which items can be added by a call to put() and from which items can be retrieved by a call to get().

Is multiprocessing faster than multithreading?

2-Use Cases for Multiprocessing: Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. Show activity on this post. Process may have multiple threads. These threads may share memory and are the units of execution within a process.


2 Answers

With MPipe module, simply do this:

from mpipe import OrderedStage, Pipeline

def f1(value):
    return value * 2

def f2(value):
    print(value)

s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))

for task in 1, 2, 3, 4, 5, None:
    p.put(task)

The above runs 4 processes:

  • two for the first stage (function f1)
  • one for the second stage (function f2)
  • and one more for the main program that feeds the pipeline.

The MPipe cookbook offers some explanation of how processes are shut down internally using None as the last task.

To run the code, install MPipe:

virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py

Output:

2
4
6
8
10
like image 187
Velimir Mlaker Avatar answered Oct 06 '22 05:10

Velimir Mlaker


For Idea 1, how about:

import multiprocessing as mp

sentinel=None

def f2(inq,outq):
    while True:
        val=inq.get()
        if val is sentinel:
            break
        outq.put(val*2)

def f3(outq):
    while True:
        val=outq.get()
        if val is sentinel:
            break
        print(val)

def f1():
    num_workers=2
    inq=mp.Queue()
    outq=mp.Queue()
    for i in range(5):
        inq.put(i)
    for i in range(num_workers):        
        inq.put(sentinel)
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
    printer=mp.Process(target=f3,args=(outq,))
    for w in workers:
        w.start()
    printer.start()
    for w in workers:
        w.join()
    outq.put(sentinel)
    printer.join()

if __name__=='__main__':
    f1()

The only difference from the description of Idea 1 is that f2 breaks out of the while-loop when it receives the sentinel (thus terminating itself). f1 blocks until the workers are done (using w.join()) and then sends f3 the sentinel (signaling that it break out of its while-loop).

like image 23
unutbu Avatar answered Oct 06 '22 05:10

unutbu