Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SqlAlchemy 1.4 throws InternalServerError (cache lookup failed for type 3912040)

I'm using SqlAlchemy 1.4.18 (async) and I believe I'm running into a race condition that I can't explain. The underlying database is Postgres and asyncpg is used internally by SqlAlchemy.

I have the following insert function in SQL Alchemy Core.

async def create_device(
    device_id: str,
    device_type: DeviceType,
    account_type: AccountType = AccountType.FREE,
    expires_at: Optional[datetime] = None,
    account_id: Optional[int] = None,
    is_banned: bool = False,
    last_login_at: Optional[datetime] = None,
) -> datetime:
    if expires_at is None:
        expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
            days=7
        )
    async with engine.begin() as conn:
        await conn.execute(
            DeviceTable.insert().values(
                id=device_id,
                type=device_type,
                expires_at=expires_at,
                account_type=account_type,
                account_id=account_id,
                is_banned=is_banned,
                last_login_at=last_login_at,
            ),
        )
        return expires_at

The unit test runs successfully on its own. However when I run all tests in the test class, then this test will fail every time.

@pytest.mark.asyncio
    @patch("service.email_service.EmailService.confirm_token")
    async def test_confirm_email_already_confirmed(self, mock_token, client):
        expiry_date = self.get_time_in_future()
        account_id = await crud_account.create_account(
            "[email protected]", "pass1", is_confirmed=True
        )
        await crud_device.create_device(
            "u1", DeviceType.IPHONE, account_id=account_id, expires_at=expiry_date
        )
## It has already failed at this point.
        mock_token.return_value = "[email protected]"
        result = await client.get("/email/confirm/t1")
        assert result.status_code == 200

Error:

../app/database/crud_device.py:26: in create_device
    await conn.execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/engine.py:405: in execute
    result = await greenlet_spawn(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:125: in greenlet_spawn
    result = context.throw(*sys.exc_info())
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1582: in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/sql/elements.py:324: in _execute_on_connection
    return connection._execute_clauseelement(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1451: in _execute_clauseelement
    ret = self._execute_context(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1813: in _execute_context
    self._handle_dbapi_exception(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1994: in _handle_dbapi_exception
    util.raise_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py:207: in raise_
    raise exception
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/base.py:1770: in _execute_context
    self.dialect.do_execute(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/engine/default.py:717: in do_execute
    cursor.execute(statement, parameters)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:449: in execute
    self._adapt_connection.await_(
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:67: in await_only
    return current.driver.switch(awaitable)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py:120: in greenlet_spawn
    value = await result
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:424: in _prepare_and_execute
    self._handle_exception(error)
../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:358: in _handle_exception
    self._adapt_connection._handle_exception(error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <sqlalchemy.dialects.postgresql.asyncpg.AsyncAdapt_asyncpg_connection object at 0x115007340>
error = InternalServerError('cache lookup failed for type 3912040')

    def _handle_exception(self, error):
        if self._connection.is_closed():
            self._transaction = None
            self._started = False
    
        if not isinstance(error, AsyncAdapt_asyncpg_dbapi.Error):
            exception_mapping = self.dbapi._asyncpg_error_translate
    
            for super_ in type(error).__mro__:
                if super_ in exception_mapping:
                    translated_error = exception_mapping[super_](
                        "%s: %s" % (type(error), error)
                    )
                    translated_error.pgcode = (
                        translated_error.sqlstate
                    ) = getattr(error, "sqlstate", None)
>                   raise translated_error from error
E                   sqlalchemy.exc.InternalError: (sqlalchemy.dialects.postgresql.asyncpg.InternalServerError) <class 'asyncpg.exceptions.InternalServerError'>: cache lookup failed for type 3912040
E                   [SQL: INSERT INTO main.device (id, type, created_at, last_login_at, expires_at, account_type, is_banned, account_id) VALUES (%s, %s, now(), NULL, %s, %s, %s, %s)]
E                   [parameters: ('u1', 'IPHONE', datetime.datetime(2021, 6, 16, 10, 24), 'FREE', False, 1)]
E                   (Background on this error at: http://sqlalche.me/e/14/2j85)

../../../.pyenv/versions/venv/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py:652: InternalError

What is this translated_error? Many thanks

like image 829
Houman Avatar asked Oct 14 '22 20:10

Houman


2 Answers

I had help with this from the SQLAlchemy maintainers.

As Nadir suggested in the other answer, one way to resolve this is to disable the caching, however this will have a performance hit.

The correct way to solve that is by invalidating the async engine from your teardown unittest. This goes with the assumption that you are using a sync engine in your test_base.py.

Database.py

engine = create_async_engine(
    "postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

test_base.py

from database.database import metadata, engine

engine_sync = create_engine(
    "postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

class TestBase:
    async def dispose(self):
        await engine.dispose()

    def teardown(self):
        metadata.drop_all(engine_sync)
        asyncio.run(self.dispose())
like image 66
Houman Avatar answered Nov 04 '22 01:11

Houman


I was able to solve the problem. It appears to be an issue with the asyncpg driver prepared statement cache. This sounds like a frequent issue, because they mention it in their FAQs here

I first attempted to turn-off this caching behavior by setting the query_cache_size=0 in the SQLAlchemy create_async_engine. This is mentioned in the SQLAlchemy documentation:

async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}",
    echo=True,
    query_cache_size=0
)

However, looking at the logs of the SQL spit out by echo, it appears to still use the cache. I think this is because I misunderstood that both asyncpg and SQLAlchemy seem to implement a statement cache of sorts. The SQLAlchemy param doesn't affect the asyncpg behavior.

I then found some discussion about the asyncpg prepared statement cache on SQLAlchemy GitHub Issue 6467.

Based upon a comment in that thread, I was able to solve the problem by passing the prepared_statement_cache_size=0 as a query param directly in the PostgreSQL URI. The new, working create_async_engine looks like:

async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_SERVER}/{settings.POSTGRES_DB}?prepared_statement_cache_size=0",
    echo=True
)

NOTE: The query_cache_size SQLAlchemy param didn't affect this issue, so I removed that to allow SQLAlchemy to cache its compiled SQL statements using the default behavior in 1.4.

like image 35
Nadir Sidi Avatar answered Nov 03 '22 23:11

Nadir Sidi