Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RuntimeError: Event loop is closed - motor, asyncio

I am not able to run this test, i always have the same error RuntimeError: Event loop is closed

What i need to add to this code?

from motor.motor_asyncio import AsyncIOMotorClient
import pytest
import asyncio

client = AsyncIOMotorClient("mongodb://mongo:[email protected]:27017/admin?retryWrites=false")
db = client['app']
aux = db['users']

async def create_user_db(a: dict):
    x = await aux.insert_one(a)
    return x

@pytest.mark.asyncio
async def test_create():
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(form)
    assert res != None

This is the error

enter image description here

like image 717
Cristoff Avatar asked Oct 29 '25 05:10

Cristoff


1 Answers

In your example, your opening the database during "import" time, but we still have no eventloop. The event loop is created when the test case runs.

You could define your database as fixture and provide it to the testing functions, e.g.:

@pytest.fixture
def client():
    return AsyncIOMotorClient("mongodb://localhost:27017/")


@pytest.fixture
def db(client):
    return client['test']


@pytest.fixture
def collection(db):
    return db['test']


async def create_user_db(collection, a: dict):
    x = await collection.insert_one(a)
    return x



@pytest.mark.asyncio
async def test_create(collection):
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(collection, form)
    assert res != None
like image 181
erny Avatar answered Oct 30 '25 22:10

erny



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!