Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do transactions in python with asyncio and postgres?

There are two operations in my RPC method:

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("SELECT ... FROM MyTable");
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATA MyTable ...");

Another RPC method can change DB before operation "my_rpc" will be done (between two awaits of SQL queries). How to avoid this situation?

Code of self.Engine (calls with engine aiopg.sa.create_engine):

class ConnectionContextManager(object):
    def __init__(self, engine):
        self.conn = None
        self.engine = engine

    async def __aenter__(self):
        if self.engine:
            self.conn = await self.engine.acquire()
            return self.conn

    async def __aexit__(self, exc_type, exc, tb):
        try:
            self.engine.release(self.conn)
            self.conn.close()
        finally:
            self.conn = None
            self.engine = None
like image 250
Artem Selivanov Avatar asked Mar 11 '23 13:03

Artem Selivanov


1 Answers

Firstly, aiopg works in autocommit mode, meaning that you have to use transaction in manual mode. Read more details.

Secondly, you have to use SELECT FOR UPDATE for lock row that you read in first statement. SELECT FOR UPDATE locks select rows while until the transaction completes. Read more details.

async def my_rpc(self, data):
    async with self.Engine() as conn:
        await conn.execute("BEGIN")
        await conn.execute("SELECT ... FROM MyTable WHERE some_clause = some_value FOR UPDATE")
        ...  # It seems the table MyTable can be changed by another RPC
        await conn.execute("UPDATE MyTable SET some_clause=...")
        await conn.execute("""COMMIT""")
like image 93
Peter Avatar answered Apr 27 '23 00:04

Peter