I am new to python 3 and asyncio coming from gevent and 2.7 ....
How do I create a global connection that will be use by all for reids? E.g. I will have on 1 process e.g. 10 asyncio threads but I dont want a separate connection per thread. Why?..will have e.g. 100 cores with 10 threads per core and dont want that many connections to redis
import asyncio
import asyncio_redis
async def worker():
while True:
data = await connection.brpop(['queue'], timeout=0)
print(data)
res = blocking_code(data)
await connection.set('test',res)
#Process raw data here and all code is blocking
def blocking_code(data):
results = {}
return results
if __name__ == '__main__':
connection = asyncio_redis.Connection.create(host='127.0.0.1', port=6379, poolsize=2)
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(worker()), asyncio.ensure_future(worker())]
loop.run_until_complete(asyncio.gather(*tasks))
connection.close()
Traceback (most recent call last):
File "/Users//worker.py", line 14, in <module>
loop.run_until_complete(example())
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
return future.result()
File "/Users//worker.py", line 7, in example
data = yield from connection.brpop(['queue'], timeout=0)
AttributeError: 'generator' object has no attribute 'brpop'
So in the above I have two tasks but I want only 1 redis connection
10 asyncio threads
Just in case - asyncio
coroutines run in one thread. Concurrency achieved by switching between coroutines while I/O operations.
asyncio_redis.Connection.create
- is a coroutine you should await this operation using yield from
to get result from it:
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
If you'll have only one connection, you'll probably get no benefit from using asyncio. Concurrent requests may need pool of connection that can be used. asyncio_redis
has easy way to do it, for example:
import asyncio
import asyncio_redis
@asyncio.coroutine
def main():
connection = yield from asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
try:
# 3 requests running concurrently in single thread using connections from pool:
yield from asyncio.gather(
connection.brpop(['queue:pixel'], timeout=0),
connection.brpop(['queue:pixel'], timeout=0),
connection.brpop(['queue:pixel'], timeout=0),
)
finally:
connection.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
If you're working with Python 3.5+ consider using newer syntax for defining and awaiting coroutines.
Upd:
Blocking code (for example, code that needs much CPU time) can't be used inside coroutines directly: it'll freeze your event loop and you'll get no benefit of asyncio. It's not related to number of connections.
You can use run_in_executor to run this code in separate process without blocking event loop:
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=10) # use number of cores here
async def worker():
while True:
data = await connection.brpop(['queue'], timeout=0)
print(data)
# await blocking_code from separate process:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, blocking_code, data)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With