Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to combine python asyncio with threads?

I have successfully built a RESTful microservice with Python asyncio and aiohttp that listens to a POST event to collect realtime events from various feeders.

It then builds an in-memory structure to cache the last 24h of events in a nested defaultdict/deque structure.

Now I would like to periodically checkpoint that structure to disc, preferably using pickle.

Since the memory structure can be >100MB I would like to avoid holding up my incoming event processing for the time it takes to checkpoint the structure.

I'd rather create a snapshot copy (e.g. deepcopy) of the structure and then take my time to write it to disk and repeat on a preset time interval.

I have been searching for examples on how to combine threads (and is a thread even the best solution for this?) and asyncio for that purpose but could not find something that would help me.

Any pointers to get started are much appreciated!

like image 688
fxstein Avatar asked Feb 13 '15 03:02

fxstein


People also ask

Does async IO Python use threads?

Asyncio exploits the fact that I/O operations release the GIL to give us concurrency, even with only one thread. When we utilize asyncio we create objects called coroutines. A coroutine can be thought of as executing a lightweight thread.

Does async IO run on multiple threads?

Multi-threading is a traditional solution that performs tasks asynchronously. Both asyncio and multi-threading run concurrently.

How do I use async IO threads?

Concurrency with asyncio Instead of using Python threads to run instructions concurrently, asyncio uses an event loop to schedule instructions on the main thread. Contrasted with threads, asyncio coroutines may never be interrupted unless they explicitly yield the thread with async or await keywords.


1 Answers

It's pretty simple to delegate a method to a thread or sub-process using BaseEventLoop.run_in_executor:

import asyncio import time from concurrent.futures import ProcessPoolExecutor  def cpu_bound_operation(x):     time.sleep(x) # This is some operation that is CPU-bound  @asyncio.coroutine def main():     # Run cpu_bound_operation in the ProcessPoolExecutor     # This will make your coroutine block, but won't block     # the event loop; other coroutines can run in meantime.     yield from loop.run_in_executor(p, cpu_bound_operation, 5)   loop = asyncio.get_event_loop() p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes loop.run_until_complete(main()) 

As for whether to use a ProcessPoolExecutor or ThreadPoolExecutor, that's kind of hard to say; pickling a large object will definitely eat some CPU cycles, which initially would make you think ProcessPoolExecutor is the way to go. However, passing your 100MB object to a Process in the pool would require pickling the instance in your main process, sending the bytes to the child process via IPC, unpickling it in the child, and then pickling it again so you can write it to disk. Given that, my guess is the pickling/unpickling overhead will be large enough that you're better off using a ThreadPoolExecutor, even though you're going to take a performance hit because of the GIL.

That said, it's very simple to test both ways and find out for sure, so you might as well do that.

like image 120
dano Avatar answered Sep 20 '22 17:09

dano