TL; DR
This was indeed a bug in Motor 1.2.0 that was promptly fixed by A. Jesse Jiryu Davis and is available in version 1.2.1 or greater of the driver.
Original Question
I wrote a program to monitor changes to a MongoDB collection using its new Change Stream feature, on Python 3. Here's the MCVE:
from asyncio import get_event_loop, CancelledError
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient
async def watch(collection):
async with collection.watch([]) as stream:
async for change in stream:
print(change)
async def cleanup():
task.cancel()
with suppress(CancelledError):
await task
if __name__ == '__main__':
conn = AsyncIOMotorClient()
loop = get_event_loop()
task = loop.create_task(watch(conn.database.collection)) # Replace with a real collection.
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(cleanup())
loop.shutdown_asyncgens()
loop.close()
When I kill the program with CTRL+C, it raises three different exceptions.
^Cexception calling callback for <Future at 0x102efea58 state=finished raised InvalidStateError>
Traceback (most recent call last):
File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1259, in _next
change = self.delegate.next()
File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/change_stream.py", line 79, in next
change = self._cursor.next()
File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/command_cursor.py", line 292, in next
raise StopIteration
StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1264, in _next
future.set_exception(StopAsyncIteration())
asyncio.base_futures.InvalidStateError: invalid state
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
callback(self)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
dest_loop.call_soon_threadsafe(_set_state, destination, source)
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
self._check_closed()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Is there a way to make that program close silently?
I'm testing with Python 3.6.4, Motor 1.2 and pymongo 3.6.0 on macOS Sierra.
I think your code is correct, problem on motor
's side.
While investigating I found two problems:
exception calling callback for <Future
error since loop closed before async callbacks done. It seems not to be related to async generators or streams, but to any motor
usage.AgnosticChangeStream
async iteration mechanism (_next function) is written without thinking of case when it cancelled. Try of setting exception to cancelled future leads to InvalidStateError
.This code demonstrates two problems and possible workarounds:
import types
import asyncio
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient
async def test():
while True:
await asyncio.sleep(0.1)
async def cleanup(task):
task.cancel()
with suppress(asyncio.CancelledError):
await task
def _next(self, future):
try:
if not self.delegate:
self.delegate = self._collection.delegate.watch(**self._kwargs)
change = self.delegate.next()
self._framework.call_soon(self.get_io_loop(),
future.set_result,
change)
except StopIteration:
future.set_exception(StopAsyncIteration())
except Exception as exc:
# CASE 2:
# Cancellation of async iteration (and future with it) happens immediately
# and trying to set exception to cancelled future leads to InvalidStateError,
# we should prevent it:
if future.cancelled():
return
future.set_exception(exc)
async def watch(collection):
async with collection.watch([]) as stream:
# Patch stream to achieve CASE 2:
stream._next = types.MethodType(_next, stream)
async for change in stream:
print(change)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tmp = asyncio.ensure_future(test()) # Way to receive KeyboardInterrupt immediately.
client = AsyncIOMotorClient()
collection = client.test_database.test_collection
task = asyncio.ensure_future(watch(collection))
try:
loop.run_forever()
except KeyboardInterrupt:
print('KeyboardInterrupt')
finally:
loop.run_until_complete(cleanup(tmp))
loop.run_until_complete(cleanup(task))
# CASE 1:
# Looks like propagating KeyboardInterrupt doesn't affect motor's try
# to establish connection to db and I didn't find a way to stop this manually.
# We should keep event loop alive until we receive ServerSelectionTimeoutError
# and motor would be able to execute it's asyncio callbacks:
loop.run_until_complete(asyncio.sleep(client.server_selection_timeout))
loop.shutdown_asyncgens()
loop.close()
It finishes without warnings/exceptions (on my machine at least) due to added fixes.
I don't recommend you to use hacks above! It's only to demonstrate problem places and possible solutions. I'm not sure it does everything properly.
Instead I advice you to create issue at motor user group / Jira appending there your snippet and probably my answer and wait until bug would be fixed.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With