Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

multiprocessing.Pipe is even slower than multiprocessing.Queue?

Tags:

python

I tried to benchmark the speed up of Pipe over Queue from the multiprocessing package. T thought Pipe would be faster as Queue uses Pipe internally.

Strangely, Pipe is slower than Queue when sending large numpy array. What am I missing here?

Pipe:

import sys
import time
from multiprocessing import Process, Pipe
import numpy as np

NUM = 1000


def worker(conn):
    for task_nbr in range(NUM):
        conn.send(np.random.rand(400, 400, 3))
    sys.exit(1)


def main():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()


if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

# Took 10.86s.

Queue

import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
import numpy as np

NUM = 1000

def worker(q):
    for task_nbr in range(NUM):
        q.put(np.random.rand(400, 400, 3))
    sys.exit(1)

def main():
    recv_q = Queue()
    Process(target=worker, args=(recv_q,)).start()
    for num in range(NUM):
        message = recv_q.get()

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

# Took 6.86s.
like image 271
zaxliu Avatar asked Jan 20 '18 07:01

zaxliu


People also ask

Is multiprocessing queue slow?

In other words, Multiprocess queue is pretty slow putting and getting individual data, then QuickQueue wrap several data in one list, this list is one single data that is enqueue in the queue than is more quickly than put one individual data.

What is multiprocessing pipe?

In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing. connection. Connection class.

Why multi process is slow?

The multiprocessing version is slower because it needs to reload the model in every map call because the mapped functions are assumed to be stateless. The multiprocessing version looks as follows. Note that in some cases, it is possible to achieve this using the initializer argument to multiprocessing.

Is multiprocessing faster?

Multiprocessing outshines threading in cases where the program is CPU intensive and doesn't have to do any IO or user interaction. For example, any program that just crunches numbers will see a massive speedup from multiprocessing; in fact, threading will probably slow it down.


Video Answer


3 Answers

You can do an experiment and put the following into your Pipe code above..

def worker(conn):
    for task_nbr in range(NUM):
        data = np.random.rand(400, 400, 3)
    sys.exit(1)

def main():
    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=worker, args=(child_conn,))
    p.start()
    p.join()

This gives you the time that it takes to create the data for your test. On my system this takes about 2.9 seconds.

Under the hood the queue object implements a buffer and a threaded send. The thread is still in the same process but by using it, the data creation doesn't have to wait for the system IO to complete. It effectively parallelizes the operations. Try your Pipe code modified with some simple threading implemented (disclaimer, code here is for test only and is not production ready)..

import sys
import time
import threading
from multiprocessing import Process, Pipe, Lock
import numpy as np
import copy

NUM = 1000

def worker(conn):
    _conn = conn
    _buf = []
    _wlock = Lock()
    _sentinel = object() # signal that we're done
    def thread_worker():
        while 1:
            if _buf:
                _wlock.acquire()
                obj = _buf.pop(0)
                if obj is _sentinel: return
                _conn.send(data)
                _wlock.release()
    t = threading.Thread(target=thread_worker)
    t.start()
    for task_nbr in range(NUM):
        data = np.random.rand(400, 400, 3)
        data[0][0][0] = task_nbr    # just for integrity check
        _wlock.acquire()
        _buf.append(data)
        _wlock.release()
    _wlock.acquire()
    _buf.append(_sentinel)
    _wlock.release()
    t.join()
    sys.exit(1)

def main():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()
        assert num == message[0][0][0], 'Data was corrupted'        

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

On my machine this takes 3.4 seconds to run which is almost exactly the same as your Queue code above.

From https://docs.python.org/2/library/threading.html

In Cython, due to due to the Global Interpreter Lock, only one thread can execute Python code at once... however, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

The queue and pipe differences are definitely an odd implementation detail until you dig into it a bit.

like image 175
bivouac0 Avatar answered Nov 15 '22 21:11

bivouac0


On my system Pipe(duplex=False) is slower (twice the time, or half the rate) than Pipe(duplex=True). For anyone looking for performance here is a side-by-side comparison:

from time import time
from multiprocessing import Process, Queue, Pipe

n = 1000
buffer = b'\0' * (1000*1000) # 1 megabyte

def print_elapsed(name, start):
    elapsed = time() - start
    spi = elapsed / n
    ips = n / elapsed
    print(f'{name}: {spi*1000:.3f} ms/item, {ips:.0f} item/sec')

def producer(q):
    start = time()
    for i in range(n):
        q.put(buffer)
    print_elapsed('producer', start)

def consumer(q):
    start = time()
    for i in range(n):
        out = q.get()
    print_elapsed('consumer', start)

class PipeQueue():
    def __init__(self, **kwargs):
        self.out_pipe, self.in_pipe = Pipe(**kwargs)
    def put(self, item):
        self.in_pipe.send_bytes(item)
    def get(self):
        return self.out_pipe.recv_bytes()
    def close(self):
        self.out_pipe.close()
        self.in_pipe.close()

print('duplex=True')
q = PipeQueue(duplex=True)
producer_process = Process(target=producer, args=(q,))
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
producer_process.start()
consumer_process.join()
producer_process.join()
q.close()

print('duplex=False')
q = PipeQueue(duplex=False)
producer_process = Process(target=producer, args=(q,))
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
producer_process.start()
consumer_process.join()
producer_process.join()
q.close()

Results:

duplex=True
consumer: 0.301 ms/item, 3317 item/sec
producer: 0.298 ms/item, 3358 item/sec
duplex=False
consumer: 0.673 ms/item, 1486 item/sec
producer: 0.669 ms/item, 1494 item/sec

I think this must come down to CPython using os.pipe vs socket.socketpair, but I'm not sure.

like image 42
Kyle McDonald Avatar answered Nov 15 '22 22:11

Kyle McDonald


I assume by your print command you are using Python2. However the strange behavior cannot be replicated with Python3, where Pipe is actually faster than Queue.

import sys
import time
from multiprocessing import Process, Pipe, Queue
import numpy as np

NUM = 20000


def worker_pipe(conn):
    for task_nbr in range(NUM):
        conn.send(np.random.rand(40, 40, 3))
    sys.exit(1)


def main_pipe():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker_pipe, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()


def pipe_test():
    start_time = time.time()
    main_pipe()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration
    print("Pipe")
    print("Duration: " + str(duration))
    print("Messages Per Second: " + str(msg_per_sec))

def worker_queue(q):
    for task_nbr in range(NUM):
        q.put(np.random.rand(40, 40, 3))
    sys.exit(1)

def main_queue():
    recv_q = Queue()
    Process(target=worker_queue, args=(recv_q,)).start()
    for num in range(NUM):
        message = recv_q.get()

def queue_test():
    start_time = time.time()
    main_queue()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration
    print("Queue")
    print("Duration: " + str(duration))
    print("Messages Per Second: " + str(msg_per_sec))


if __name__ == "__main__":
    for i in range(2):
        queue_test()
        pipe_test()

Results in:

Queue
Duration: 3.44321894646
Messages Per Second: 5808.51822408
Pipe
Duration: 2.69065594673
Messages Per Second: 7433.13169575
Queue
Duration: 3.45295906067
Messages Per Second: 5792.13354361
Pipe
Duration: 2.78426194191
Messages Per Second: 7183.23218766


------------------
(program exited with code: 0)
Press return to continue
like image 23
Fabio Veronese Avatar answered Nov 15 '22 20:11

Fabio Veronese