I'm using SqlAlchemy 1.4.18 (async) and I believe I'm running into a race condition that I can't explain. The underlying database is Postgres
and asyncpg
is used internally by SqlAlchemy.
I have the following insert function in SQL Alchemy Core.
async def create_device(
device_id: str,
device_type: DeviceType,
account_type: AccountType = AccountType.FREE,
expires_at: Optional[datetime] = None,
account_id: Optional[int] = None,
is_banned: bool = False,
last_login_at: Optional[datetime] = None,
) -> datetime:
if expires_at is None:
expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
days=7
)
async with engine.begin() as conn:
await conn.execute(
DeviceTable.insert().values(
id=device_id,
type=device_type,
expires_at=expires_at,
account_type=account_type,
account_id=account_id,
is_banned=is_banned,
last_login_at=last_login_at,
),
)
return expires_at
The unit test runs successfully on its own. However when I run all tests in the test class, then this test will fail every time.
@pytest.mark.asyncio
@patch("service.email_service.EmailService.confirm_token")
async def test_confirm_email_already_confirmed(self, mock_token, client):
expiry_date = self.get_time_in_future()
account_id = await crud_account.create_account(
"[email protected]", "pass1", is_confirmed=True
)
await crud_device.create_device(
"u1", DeviceType.IPHONE, account_id=account_id, expires_at=expiry_date
)
## It has already failed at this point.
mock_token.return_value = "[email protected]"
result = await client.get("/email/confirm/t1")
assert result.status_code == 200
Error:
../app/database/crud_device.py:26: in create_device
await conn.execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/engine.py:405: in execute
result = await greenlet_spawn(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in greenlet_spawn
result = context.throw(*sys.exc_info())
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1582: in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:324: in _execute_on_connection
return connection._execute_clauseelement(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1451: in _execute_clauseelement
ret = self._execute_context(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1813: in _execute_context
self._handle_dbapi_exception(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1994: in _handle_dbapi_exception
util.raise_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
raise exception
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1770: in _execute_context
self.dialect.do_execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py:717: in do_execute
cursor.execute(statement, parameters)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:449: in execute
self._adapt_connection.await_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:67: in await_only
return current.driver.switch(awaitable)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:120: in greenlet_spawn
value = await result
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:424: in _prepare_and_execute
self._handle_exception(error)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:358: in _handle_exception
self._adapt_connection._handle_exception(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_connection object at 0x115007340>
error = InternalServerError('cache lookup failed for type 3912040')
def _handle_exception(self, error):
if self._connection.is_closed():
self._transaction = None
self._started = False
if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
exception_mapping = self.dbapi._asyncpg_error_translate
for super_ in type(error).__mro__:
if super_ in exception_mapping:
translated_error = exception_mapping[super_](
"%s: %s" % (type(error), error)
)
translated_error.pgcode = (
translated_error.sqlstate
) = getattr(error, "sqlstate", None)
> raise translated_error from error
E sqlalchemy.exc.InternalError: (sqlalchemy.dialects.postgresql.asyncpg.InternalServerError) <class 'asyncpg.exceptions.InternalServerError'>: cache lookup failed for type 3912040
E [SQL: INSERT INTO main.device (id, type, created_at, last_login_at, expires_at, account_type, is_banned, account_id) VALUES (%s, %s, now(), NULL, %s, %s, %s, %s)]
E [parameters: ('u1', 'IPHONE', datetime.datetime(2021, 6, 16, 10, 24), 'FREE', False, 1)]
E (Background on this error at: http://sqlalche.me/e/14/2j85)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:652: InternalError
What is this translated_error
? Many thanks
I had help with this from the SQLAlchemy maintainers.
As Nadir suggested in the other answer, one way to resolve this is to disable the caching, however this will have a performance hit.
The correct way to solve that is by invalidating the async engine from your teardown unittest. This goes with the assumption that you are using a sync engine in your test_base.py
.
Database.py
engine = create_async_engine(
"postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
test_base.py
from database.database import metadata, engine
engine_sync = create_engine(
"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
class TestBase:
async def dispose(self):
await engine.dispose()
def teardown(self):
metadata.drop_all(engine_sync)
asyncio.run(self.dispose())
I was able to solve the problem. It appears to be an issue with the asyncpg driver prepared statement cache. This sounds like a frequent issue, because they mention it in their FAQs here
I first attempted to turn-off this caching behavior by setting the query_cache_size=0
in the SQLAlchemy create_async_engine
. This is mentioned in the SQLAlchemy documentation:
async_engine = create_async_engine(
f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}",
echo=True,
query_cache_size=0
)
However, looking at the logs of the SQL spit out by echo, it appears to still use the cache. I think this is because I misunderstood that both asyncpg and SQLAlchemy seem to implement a statement cache of sorts. The SQLAlchemy param doesn't affect the asyncpg behavior.
I then found some discussion about the asyncpg prepared statement cache on SQLAlchemy GitHub Issue 6467.
Based upon a comment in that thread, I was able to solve the problem by passing the prepared_statement_cache_size=0
as a query param directly in the PostgreSQL URI. The new, working create_async_engine
looks like:
async_engine = create_async_engine(
f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}?prepared_statement_cache_size=0",
echo=True
)
NOTE: The query_cache_size
SQLAlchemy param didn't affect this issue, so I removed that to allow SQLAlchemy to cache its compiled SQL statements using the default behavior in 1.4.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With