Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I use asyncio to read from and write to a multiprocessing.Pipe?

I need to communicate between processes in Python and am using asyncio in each of the processes for concurrent network IO.

Currently I'm using multiprocessing.Pipe to send and recv significantly large amounts of data between the processes, however I do so outside of asyncio and I believe I'm spending a lot of cpu time in IO_WAIT because of it.

It seems like asyncio can and should be used to handle the Pipe IO between processes, however I can't find an example for anything but piping STDIN/STDOUT.

From what I read it seems like I should register the pipe with loop.connect_read_pipe(PROTOCOL_FACTORY, PIPE) and likewise for write. However I don't understand the purpose of protocol_factory as it would relate to a multiprocessing.Pipe. It's not even clear if I should be creating a multiprocessing.Pipe or whether I can create a pipe within asyncio.

like image 705
David Parks Avatar asked Nov 05 '19 22:11

David Parks


People also ask

Can I use Asyncio with multiprocessing?

asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes.

Is Asyncio better than threading?

One of the cool advantages of asyncio is that it scales far better than threading . Each task takes far fewer resources and less time to create than a thread, so creating and running more of them works well. This example just creates a separate task for each site to download, which works out quite well.

When should I use Asyncio Python?

asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. asyncio is often a perfect fit for IO-bound and high-level structured network code.

Why should I use Asyncio?

asyncio essentially provides significantly more control over concurrency at the cost of you need to take control of the concurrency more.


2 Answers

multiprocessing.Pipe uses the high level multiprocessing.Connection module that pickles and unpickles Python objects and transmits additional bytes under the hood. If you wanted to read data from one of these pipes using loop.connect_read_pipe(), you would have to re-implement all of this yourself.

The easiest way to read from a multiprocessing.Pipe without blocking the event loop would be to use loop.add_reader(). Consider the following example:

import asyncio
import multiprocessing


def main():
    read, write = multiprocessing.Pipe(duplex=False)
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    data_available = asyncio.Event()
    asyncio.get_event_loop().add_reader(read.fileno(), data_available.set)

    if not read.poll():
        await data_available.wait()

    print(read.recv())
    data_available.clear()


def writer(write):
    write.send('Hello World')


if __name__ == '__main__':
    main()

Pipes created using the lower-level os.pipe don't add anything extra the way that pipes from multiprocessing.Pipe do. As a result, we can use os.pipe with loop.connect_read_pipe(), without re-implementing any sort of inner-workings. Here is an example:

import asyncio
import multiprocessing
import os


def main():
    read, write = os.pipe()
    writer_process = multiprocessing.Process(target=writer, args=(write,))
    writer_process.start()
    asyncio.get_event_loop().run_until_complete(reader(read))


async def reader(read):
    pipe = os.fdopen(read, mode='r')

    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader()
    def protocol_factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(protocol_factory, pipe)
    print(await stream_reader.readline())
    transport.close()


def writer(write):
    os.write(write, b'Hello World\n')


if __name__ == '__main__':
    main()

This code helped me figure out to use loop.connect_read_pipe.

like image 65
akindofyoga Avatar answered Sep 22 '22 17:09

akindofyoga


aiopipe seems to do what you want! It can be used with the builtin multiprocessing module, and provides a similar API to the regular blocking pipes.

like image 31
kmaork Avatar answered Sep 18 '22 17:09

kmaork