Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What kind of problems (if any) would there be combining asyncio with multiprocessing?

As almost everyone is aware when they first look at threading in Python, there is the GIL that makes life miserable for people who actually want to do processing in parallel - or at least give it a chance.

I am currently looking at implementing something like the Reactor pattern. Effectively I want to listen for incoming socket connections on one thread-like, and when someone tries to connect, accept that connection and pass it along to another thread-like for processing.

I'm not (yet) sure what kind of load I might be facing. I know there is currently setup a 2MB cap on incoming messages. Theoretically we could get thousands per second (though I don't know if practically we've seen anything like that). The amount of time spent processing a message isn't terribly important, though obviously quicker would be better.

I was looking into the Reactor pattern, and developed a small example using the multiprocessing library that (at least in testing) seems to work just fine. However, now/soon we'll have the asyncio library available, which would handle the event loop for me.

Is there anything that could bite me by combining asyncio and multiprocessing?

like image 613
Wayne Werner Avatar asked Jan 16 '14 10:01

Wayne Werner


People also ask

Can I use async IO with multiprocessing?

asyncio has an API for interoperating with Python's multiprocessing library. This lets us use async await syntax as well as asyncio APIs with multiple processes.

Does async IO use multithreading?

Instead of using Python threads to run instructions concurrently, asyncio uses an event loop to schedule instructions on the main thread. Contrasted with threads, asyncio coroutines may never be interrupted unless they explicitly yield the thread with async or await keywords.

Is async IO faster than threading?

One of the cool advantages of asyncio is that it scales far better than threading . Each task takes far fewer resources and less time to create than a thread, so creating and running more of them works well. This example just creates a separate task for each site to download, which works out quite well.

Does async IO use multiple cores?

asyncio, coroutines, greenlets, etc. have no direct effect on performance and do not help using multiple cores efficiently; asynchronous systems may lead to slower performance (ex.


3 Answers

You should be able to safely combine asyncio and multiprocessing without too much trouble, though you shouldn't be using multiprocessing directly. The cardinal sin of asyncio (and any other event-loop based asynchronous framework) is blocking the event loop. If you try to use multiprocessing directly, any time you block to wait for a child process, you're going to block the event loop. Obviously, this is bad.

The simplest way to avoid this is to use BaseEventLoop.run_in_executor to execute a function in a concurrent.futures.ProcessPoolExecutor. ProcessPoolExecutor is a process pool implemented using multiprocessing.Process, but asyncio has built-in support for executing a function in it without blocking the event loop. Here's a simple example:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

For the majority of cases, this is function alone is good enough. If you find yourself needing other constructs from multiprocessing, like Queue, Event, Manager, etc., there is a third-party library called aioprocessing (full disclosure: I wrote it), that provides asyncio-compatible versions of all the multiprocessing data structures. Here's an example demoing that:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
like image 143
dano Avatar answered Oct 23 '22 09:10

dano


Yes, there are quite a few bits that may (or may not) bite you.

  • When you run something like asyncio it expects to run on one thread or process. This does not (by itself) work with parallel processing. You somehow have to distribute the work while leaving the IO operations (specifically those on sockets) in a single thread/process.
  • While your idea to hand off individual connections to a different handler process is nice, it is hard to implement. The first obstacle is that you need a way to pull the connection out of asyncio without closing it. The next obstacle is that you cannot simply send a file descriptor to a different process unless you use platform-specific (probably Linux) code from a C-extension.
  • Note that the multiprocessing module is known to create a number of threads for communication. Most of the time when you use communication structures (such as Queues), a thread is spawned. Unfortunately those threads are not completely invisible. For instance they can fail to tear down cleanly (when you intend to terminate your program), but depending on their number the resource usage may be noticeable on its own.

If you really intend to handle individual connections in individual processes, I suggest to examine different approaches. For instance you can put a socket into listen mode and then simultaneously accept connections from multiple worker processes in parallel. Once a worker is finished processing a request, it can go accept the next connection, so you still use less resources than forking a process for each connection. Spamassassin and Apache (mpm prefork) can use this worker model for instance. It might end up easier and more robust depending on your use case. Specifically you can make your workers die after serving a configured number of requests and be respawned by a master process thereby eliminating much of the negative effects of memory leaks.

like image 9
Helmut Grohne Avatar answered Oct 23 '22 07:10

Helmut Grohne


See PEP 3156, in particular the section on Thread interaction:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

This documents clearly the new asyncio methods you might use, including run_in_executor(). Note that the Executor is defined in concurrent.futures, I suggest you also have a look there.

like image 1
Glenn Avatar answered Oct 23 '22 08:10

Glenn