I have an async function that calls a streaming api. What is the best way to write unit test for this function? The api response has to be mocked.
I tried with aiounittest and used mock from unittest. But this calls the actual api instead of getting the mocked response. Also tried with pytest.mark.asyncio annotation, but this kept giving me the error - coroutine was never awaited. I have verified that pytest-asyncio has been installed.
I am using VS Code and Python 3.6.6
Here is the relevant code snippet:
async def method1():
response = requests.get(url=url, params=params, stream=True)
for data in response.iter_lines():
# processing logic here
yield data
Pasting some of the tests I tried.
def mocked_get(*args, **kwargs):
#implementation of mock
class TestClass (unittest.TestCase):
@patch("requests.get", side_effect=mocked_get)
async def test_method (self, mock_requests):
resp = []
async for data in method1:
resp.append (data)
#Also tried await method1
assert resp
Also tried with class TestClass (aiounittest.AsyncTestCase):
Use asynctest
instead of aiounittest
.
unittest.TestCase
with asynctest.TestCase
.from unittest.mock import patch
with from asynctest.mock import patch
.async for data in method1:
should be async for data in method1():
.import asynctest
from asynctest.mock import patch
class TestClass(asynctest.TestCase):
@patch("requests.get", side_effect=mocked_get)
async def test_method(self, mock_requests):
resp = []
async for data in method1():
resp.append(data)
assert resp
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With