I have been working on yet another API using FastAPI and trying to write test cases for the APIs, but facing error that event loop is closed.
My setup:
So, I am using asyncpg driver/library to connect to the main db and for testcases using aiosqlite driver/library to connect with db.sqlite3 file based db
factories.py (kindly ignore this as this is just to generate fake data)
import factory
from factory.enums import CREATE_STRATEGY
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
from factory.fuzzy import FuzzyChoice
from app.models import User
class SQLAlchemyModelFactory(factory.alchemy.SQLAlchemyModelFactory):
class Meta:
abstract = True
sqlalchemy_session = None
sqlalchemy_session_persistence = "commit"
@classmethod
def set_session(cls, session: Session):
"""Set the session dynamically for all factories inheriting this class"""
cls._meta.sqlalchemy_session = session
@classmethod
async def _save(cls, model_class, session, args, kwargs):
"""Save the model instance using an async session."""
if not isinstance(session, AsyncSession):
raise ValueError("AsyncSQLAlchemyModelFactory requires an AsyncSession")
obj = model_class(*args, **kwargs)
session.add(obj)
await session.commit()
return obj
@classmethod
async def create(cls, **kwargs):
"""Override create to be async."""
return await cls._generate(CREATE_STRATEGY, kwargs)
class UserFactory(SQLAlchemyModelFactory):
class Meta:
model = User
first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name")
email = factory.Faker("email")
password = factory.Faker('text', max_nb_chars=8)
gender = FuzzyChoice(("male", "female", "others"))
mobile = factory.Faker("phone_number")
is_superuser = factory.Faker("boolean")
is_staff = factory.Faker("boolean")
is_active = factory.Faker("boolean")
created_at = factory.Faker("date_time")
updated_at = factory.Faker("date_time")
logged_in_at = factory.Faker("date_time")
logged_out_at = factory.Faker("date_time")
models.py:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import relationship, DeclarativeBase
# TEST_DATABASE_FILENAME = "db.sqlite3"
# SQLALCHEMY_TEST_DATABASE_URL = f"sqlite+aiosqlite:///{TEST_DATABASE_FILENAME}"
# SQLALCHEMY_DATABASE_URL="postgresql+asyncpg://myapp:password@localhost:5432/appdb"
engine = create_async_engine(SQLALCHEMY_DATABASE_URL, echo=False, pool_size=10, pool_pre_ping=True)
asyncsession = async_sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession, expire_on_commit=False)
# all the models
app/tests.py (base test class)
import asyncio
import unittest
from httpx import ASGITransport, AsyncClient
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.models import Base
class AsyncTestCaseHelper:
@staticmethod
async def init(engine):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
@staticmethod
async def cleanup(engine):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
if os.path.exists(TEST_DATABASE_FILENAME):
os.remove(TEST_DATABASE_FILENAME)
class AsyncAPITestCase(unittest.IsolatedAsyncioTestCase):
router = None
@classmethod
def setUpClass(cls):
cls.engine = create_async_engine(
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
)
cls.asyncsession = async_sessionmaker(
bind=cls.engine, class_=AsyncSession, expire_on_commit=False
)
asyncio.run(AsyncTestCaseHelper.init(cls.engine))
@classmethod
def tearDownClass(cls):
asyncio.run(AsyncTestCaseHelper.cleanup(cls.engine))
cls.engine = None
cls.asyncsession = None
async def asyncSetUp(self):
self.db_session = self.asyncsession()
self.transport = ASGITransport(app=self.router)
self.client = AsyncClient(transport=self.transport, base_url="http://test")
async def asyncTearDown(self):
await self.db_session.rollback()
await self.db_session.close()
await self.client.aclose()
await self.transport.aclose()
await self.engine.dispose()
api/tests.py (the actual file where test cases are written)
import factories
import pytest
from starlette import status
from api.controller import apirouter
from app.tests import AsyncAPITestCase
class TestToken(AsyncAPITestCase):
router=apirouter
async def asyncSetUp(self):
await super().asyncSetUp()
factories.UserFactory.set_session(session=self.db_session)
factories.PermissionFactory.set_session(session=self.db_session)
factories.ContentTypeFactory.set_session(session=self.db_session)
factories.GroupFactory.set_session(session=self.db_session)
# Create test users
self.correct_user = await factories.UserFactory.create(
email="[email protected]",
password="test@12345"
)
self.incorrect_user = await factories.UserFactory.create(
email="[email protected]",
password="wrongpass"
)
await self.db_session.commit()
await self.db_session.refresh(self.correct_user)
await self.db_session.refresh(self.incorrect_user)
@pytest.mark.asyncio
async def test_token_generation_with_invalid_credential(self):
url = apirouter.url_path_for("token-signin")
response = await self.client.post(url, json={
"email": self.incorrect_user.email,
"password": self.incorrect_user.password,
})
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@pytest.mark.asyncio
async def test_token_generation_with_valid_credential(self):
url = apirouter.url_path_for("token-signin")
response = await self.client.post(url, json={
"email": self.correct_user.email,
"password": self.correct_user.password,
})
assert response.status_code == status.HTTP_200_OK
requirements.txt
fastapi
uvicorn
sqlalchemy
asyncpg
factory_boy
httpx
aiosqlite
pytest
pytest-asyncio
faker
I try to run my testcases: pytest api\tests.py -v
it gives beelow error(whole traceback):
self = <ProactorEventLoop running=False closed=True debug=True>
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is closed
C:\Users\Anura\miniconda3\Lib\asyncio\base_events.py:540: RuntimeError
During handling of the above exception, another exception occurred:
self = <api.tests.TestToken testMethod=test_token_generation_with_valid_credential>
async def test_token_generation_with_valid_credential(self):
url = apirouter.url_path_for("token-signin")
> response = await self.client.post(url, json={
"email": self.correct_user.email,
"password": self.correct_user.password,
})
api\tests.py:111:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
venv\Lib\site-packages\httpx\_client.py:1859: in post
return await self.request(
venv\Lib\site-packages\httpx\_client.py:1540: in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
venv\Lib\site-packages\httpx\_client.py:1629: in send
response = await self._send_handling_auth(
venv\Lib\site-packages\httpx\_client.py:1657: in _send_handling_auth
response = await self._send_handling_redirects(
venv\Lib\site-packages\httpx\_client.py:1694: in _send_handling_redirects
response = await self._send_single_request(request)
venv\Lib\site-packages\httpx\_client.py:1730: in _send_single_request
response = await transport.handle_async_request(request)
venv\Lib\site-packages\httpx\_transports\asgi.py:170: in handle_async_request
await self.app(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:714: in __call__
await self.middleware_stack(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:734: in app
await route.handle(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:288: in handle
await self.app(scope, receive, send)
venv\Lib\site-packages\starlette\routing.py:76: in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
venv\Lib\site-packages\starlette\_exception_handler.py:53: in wrapped_app
raise exc
venv\Lib\site-packages\starlette\_exception_handler.py:42: in wrapped_app
await app(scope, receive, sender)
venv\Lib\site-packages\starlette\routing.py:73: in app
response = await f(request)
venv\Lib\site-packages\fastapi\routing.py:301: in app
raw_response = await run_endpoint_function(
venv\Lib\site-packages\fastapi\routing.py:212: in run_endpoint_function
return await dependant.call(**values)
fastapi_extensions\views.py:148: in dispatch_request
return await super().dispatch_request(request=request)
fastapi_extensions\views.py:76: in dispatch_request
return await method(request)
api\views.py:20: in post
user = await User.fetch(email = data.email, password = data.password, first_only=True)
fastapi_extensions\models.py:77: in fetch
result = await session.execute(select(cls).filter_by(**kwargs))
venv\Lib\site-packages\sqlalchemy\ext\asyncio\session.py:463: in execute
result = await greenlet_spawn(
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:201: in greenlet_spawn
result = context.throw(*sys.exc_info())
venv\Lib\site-packages\sqlalchemy\orm\session.py:2365: in execute
return self._execute_internal(
venv\Lib\site-packages\sqlalchemy\orm\session.py:2241: in _execute_internal
conn = self._connection_for_bind(bind)
venv\Lib\site-packages\sqlalchemy\orm\session.py:2110: in _connection_for_bind
return trans._connection_for_bind(engine, execution_options)
<string>:2: in _connection_for_bind
???
venv\Lib\site-packages\sqlalchemy\orm\state_changes.py:139: in _go
ret_value = fn(self, *arg, **kw)
venv\Lib\site-packages\sqlalchemy\orm\session.py:1189: in _connection_for_bind
conn = bind.connect()
venv\Lib\site-packages\sqlalchemy\engine\base.py:3274: in connect
return self._connection_cls(self)
venv\Lib\site-packages\sqlalchemy\engine\base.py:146: in __init__
self._dbapi_connection = engine.raw_connection()
venv\Lib\site-packages\sqlalchemy\engine\base.py:3298: in raw_connection
return self.pool.connect()
venv\Lib\site-packages\sqlalchemy\pool\base.py:449: in connect
return _ConnectionFairy._checkout(self)
venv\Lib\site-packages\sqlalchemy\pool\base.py:1363: in _checkout
with util.safe_reraise():
venv\Lib\site-packages\sqlalchemy\util\langhelpers.py:146: in __exit__
raise exc_value.with_traceback(exc_tb)
venv\Lib\site-packages\sqlalchemy\pool\base.py:1301: in _checkout
result = pool._dialect._do_ping_w_event(
venv\Lib\site-packages\sqlalchemy\engine\default.py:720: in _do_ping_w_event
return self.do_ping(dbapi_connection)
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:1163: in do_ping
dbapi_connection.ping()
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:813: in ping
self._handle_exception(error)
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:794: in _handle_exception
raise error
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:811: in ping
_ = self.await_(self._async_ping())
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:132: in await_only
return current.parent.switch(awaitable) # type: ignore[no-any-return,attr-defined] # noqa: E501
venv\Lib\site-packages\sqlalchemy\util\_concurrency_py3k.py:196: in greenlet_spawn
value = await result
venv\Lib\site-packages\sqlalchemy\dialects\postgresql\asyncpg.py:820: in _async_ping
await tr.start()
venv\Lib\site-packages\asyncpg\transaction.py:146: in start
await self._connection.execute(query)
venv\Lib\site-packages\asyncpg\connection.py:349: in execute
result = await self._protocol.query(query, timeout)
asyncpg\\protocol\\protocol.pyx:375: in query
???
asyncpg\\protocol\\protocol.pyx:368: in asyncpg.protocol.protocol.BaseProtocol.query
???
asyncpg\\protocol\\coreproto.pyx:1174: in asyncpg.protocol.protocol.CoreProtocol._simple_query
???
asyncpg\\protocol\\protocol.pyx:967: in asyncpg.protocol.protocol.BaseProtocol._write
???
C:\Users\Anura\miniconda3\Lib\asyncio\proactor_events.py:366: in write
self._loop_writing(data=bytes(data))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_ProactorSocketTransport fd=1556 read=<_OverlappedFuture cancelled created at C:\Users\Anura\miniconda3\Lib\asyncio\windows_events.py:506>>, f = None, data = b'Q\x00\x00\x00\x0bBEGIN;\x00'
def _loop_writing(self, f=None, data=None):
try:
if f is not None and self._write_fut is None and self._closing:
# XXX most likely self._force_close() has been called, and
# it has set self._write_fut to None.
return
assert f is self._write_fut
self._write_fut = None
self._pending_write = 0
if f:
f.result()
if data is None:
data = self._buffer
self._buffer = None
if not data:
if self._closing:
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
# Now that we've reduced the buffer size, tell the
# protocol to resume writing if it was paused. Note that
# we do this last since the callback is called immediately
# and it may add more data to the buffer (even causing the
# protocol to be paused again).
self._maybe_resume_protocol()
else:
> self._write_fut = self._loop._proactor.send(self._sock, data)
E AttributeError: 'NoneType' object has no attribute 'send'
in short error happens here:
async def test_token_generation_with_valid_credential(self):
url = apirouter.url_path_for("token-signin")
> response = await self.client.post(url, json={
"email": self.correct_user.email,
"password": self.correct_user.password,
})
AttributeError: 'NoneType' object has no attribute 'send'
# which is infact due to
def _check_closed(self):
if self._closed:
> raise RuntimeError('Event loop is closed')
E RuntimeError: Event loop is
Note: if i try to run a single test case then it runs but if i try to run 2 or more test cases it fails and show the above error
My attempts:
I tried to switch my test cases based on pure pytest and pytest_asyncio(using fixtures and all) way still having the same issue with that too but i noticed if i use poolclass=NullPool in my both of the connections (asyncpg and aiosqlite) it somehow works but setting asyncpg pool to NullPool won't be the ideal for the production. since creating a new connection for each request introduces slight delays and is generally not a good practice due to its drawbacks.
Kindly let me know if you need anything from me like my attempts with pytest and pytest_asyncio and all
thanks for your patience and reading this whole thing :)
Without the full code snippets, it's not easy to fully reproduce the problem. Furthermore, based on usage of ProactorEventLoop , it looks like the code is running on a Windows machine, so anyone running on a Unix based system may get different behavior. However, based on the description, I would recommend the following:
session-scoped pytest fixture, for managing the event loop (to @defalt 's point), which ensures the same event loop will be used.import asyncio
@pytest.fixture(scope="session", autouse=True)
def event_loop():
loop = asyncio.get_event_loop_policy().new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
Migrate to consistently using pytest , rather than having a mix of unittest and pytest , as the two test frameworks maintain the event loops differently (and to the point above, the intention is that event_loop fixture controls the loop and policy). This should be a matter of converting the corresponding asyncSetup and asyncTearDown methods into a properly scoped e.g; "module", fixture.
Decorate the asynchronous test functions with @pytest.mark.asyncio , or have the asyncio_mode pytest configurations set explicitly in a pyproject.toml or pytest.ini , if those are utilized in the project.
After trying the changes detailed above, re-run the tests and report back on if it mitigates the issue. The alternative would be migrating fully to unittest if that was the preferred approach - the same concepts would apply with regards to event loop maintenance being handled in one test framework, likely as part of the set up and teardown methods.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With