I'm trying to build a context manager for asyncpg, but so far, I got AttributeError: __enter__, this is what I've tried.
Alsgo I've tried using def __enter__ without async, but doesn't work
What would be the right way to do it?
import asyncpg
import asyncio
class DatabaseConnection(object):
def __init__(self, host, port, database, user, password):
self.connection = None
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
async def __aenter__(self):
self.connection = await asyncpg.connect(host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password)
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
# its executed when connection close
self.connection.close()
Then try to use it
async def get_time():
with DatabaseConnection(host=DB_HOST, port=DB_PORT,
database=DB_NAME, user=DB_USER,
password=DB_PASSWORD) as connection:
values = await connection.fetch('''SELECT * CURRENT_TIME''')
print(values)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_time())
For people end up here from search, here is a complete example:
class AsyncDatabaseConnection:
def __init__(self, **params):
self.connection = None
self.params = params
async def __aenter__(self):
self.connection = await asyncpg.connect(**self.params)
return self.connection
async def __aexit__(self, *_):
await self.connection.close()
async def get_data():
async with AsyncDatabaseConnection(host="db", database="dbname", user="username", password="pass") as conn:
return await conn.fetchrow("""SELECT * FROM some_table LIMIT 1;""")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With