Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python 3.4 multiprocessing Queue faster than Pipe, unexpected

I am doing an audio player that received samples from an udp socket, and everything was working fine. But when I implemented an Lost Concealment algorithm, the player failed to keep producing silence at the excepted rate (each 10ms send a list of multiple 160 bytes).

When playing audio with pyaudio, using the blocking call write to play some samples, I noticed it blocked on average for duration of the sample. So I created a new dedicated process to play the samples.

The main process processes the output stream of audio and sends the result to that process using a multiprocessing.Pipe . I decided to use the multiprocessing.Pipe because it was supposed to be faster than the other ways.

Unfortunately, when I runned the program on a virtual machine, the bitrate was half of what I was getting on my fast PC, which didnt fail to meet the target bitrate.

After some tests, I concluded that what was causing the delay was the Pipe's function send.

I did a simple benchmark script (see below) to see the differences between the various methods of transmiting to a process. The script, keeps sending a [b'\x00'*160] constantly for 5 seconds, and counts how many bytes of the bytes object were sent in total. I tested the following methods of sending: "not sending", multiprocessing.Pipe, multiprocessing.Queue, multiprocessing.Manager, multiprocessing.Listener/Client and finally, socket.socket:

Results for my "fast" PC running window 7 x64:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040

Results for the VirtualBox's VM guest running Windows 7 x64, host running Windows 7 x64:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  

Script used:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()

Question

  • If Queue uses Pipe how is it faster than Pipe in this context? This contradicts the question Python multiprocessing - Pipe vs Queue
  • How could I garante a constant bitrate stream from on process to another, while having a low send delay?

Update 1

Inside my program, after trying out with Queues instead of Pipes. I got an enormous boost.

On my computer, using Pipes I got +- 16000 B/s , using Queues I got +-7.5 Million B/s. On the virtual machine I got from +-13000 B/s to 6.5 Million B/s. Thats about 500 times more bytes using Queue instread of Pipe.

Of course I wont be playing millions of bytes per seconds, I will only be playing the normal rate for sound. (in my case 16000 B/s, coincidence with the value above).
But the point is, I can limit the rate to what I want, while still having time to finish other computations (like receiving from sockets, applying sound algorithms, etc)

like image 719
Rui Botelho Avatar asked Nov 13 '14 12:11

Rui Botelho


People also ask

Is multiprocessing faster Python?

This pattern is extremely common, and I illustrate it here with a toy stream processing application. On a machine with 48 physical cores, Ray is 6x faster than Python multiprocessing and 17x faster than single-threaded Python. Python multiprocessing doesn't outperform single-threaded Python on fewer than 24 cores.

Is Python multiprocessing queue thread safe?

Yes, it is. From https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes: Queues are thread and process safe.

What is pipe multiprocessing?

In multiprocessing, a pipe is a connection between two processes in Python. It is used to send data from one process which is received by another process. Under the covers, a pipe is implemented using a pair of connection objects, provided by the multiprocessing. connection. Connection class.

How does multiprocessing queue work in Python?

Python multiprocessing Queue classPython Multiprocessing modules provides Queue class that is exactly a First-In-First-Out data structure. They can store any pickle Python object (though simple ones are best) and are extremely useful for sharing data between processes.


1 Answers

I can't say for sure, but I think the issue you're dealing with is synchronous versus asynchronous I/O. My guess is that the Pipe is somehow ending up synchronous and the Queue is ending up asynchronous. Why exactly one is defaulting one way and the other is the other might be better answered by this question and answer:

Synchronous/Asynchronous behaviour of python Pipes

like image 55
Mike Sandford Avatar answered Sep 24 '22 13:09

Mike Sandford