Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a global connection with asyncio and redis

I am new to python 3 and asyncio coming from gevent and 2.7 ....

How do I create a global connection that will be use by all for reids? E.g. I will have on 1 process e.g. 10 asyncio threads but I dont want a separate connection per thread. Why?..will have e.g. 100 cores with 10 threads per core and dont want that many connections to redis

import asyncio
import asyncio_redis

async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)
        res = blocking_code(data)
        await connection.set('test',res)

#Process raw data here and all code is blocking
def blocking_code(data):
    results = {}
    return results

if __name__ == '__main__':
    connection = asyncio_redis.Connection.create(host='127.0.0.1', port=6379, poolsize=2)
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(worker()), asyncio.ensure_future(worker())]
    loop.run_until_complete(asyncio.gather(*tasks))
    connection.close()


    Traceback (most recent call last):
      File "/Users//worker.py", line 14, in <module>
        loop.run_until_complete(example())
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
        return future.result()
      File "/Users//worker.py", line 7, in example
        data = yield from connection.brpop(['queue'], timeout=0)
    AttributeError: 'generator' object has no attribute 'brpop'

So in the above I have two tasks but I want only 1 redis connection

like image 773
Tampa Avatar asked Nov 08 '22 18:11

Tampa


1 Answers

10 asyncio threads

Just in case - asyncio coroutines run in one thread. Concurrency achieved by switching between coroutines while I/O operations.

Why your code doesn't work?

asyncio_redis.Connection.create - is a coroutine you should await this operation using yield from to get result from it:

connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)

How to create a global connection

If you'll have only one connection, you'll probably get no benefit from using asyncio. Concurrent requests may need pool of connection that can be used. asyncio_redis has easy way to do it, for example:

import asyncio
import asyncio_redis


@asyncio.coroutine
def main():
    connection = yield from asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
    try:
        # 3 requests running concurrently in single thread using connections from pool: 
        yield from asyncio.gather(
            connection.brpop(['queue:pixel'], timeout=0),
            connection.brpop(['queue:pixel'], timeout=0),
            connection.brpop(['queue:pixel'], timeout=0),
        )
    finally:
        connection.close()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close() 

Python 3.5+

If you're working with Python 3.5+ consider using newer syntax for defining and awaiting coroutines.

Upd:

Blocking code (for example, code that needs much CPU time) can't be used inside coroutines directly: it'll freeze your event loop and you'll get no benefit of asyncio. It's not related to number of connections.

You can use run_in_executor to run this code in separate process without blocking event loop:

from concurrent.futures import ProcessPoolExecutor


executor = ProcessPoolExecutor(max_workers=10)  # use number of cores here


async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)

        # await blocking_code from separate process:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(executor, blocking_code, data)
like image 78
Mikhail Gerasimov Avatar answered Nov 26 '22 04:11

Mikhail Gerasimov