Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to cleanly shutdown Change Streams with Motor?

TL; DR

This was indeed a bug in Motor 1.2.0 that was promptly fixed by A. Jesse Jiryu Davis and is available in version 1.2.1 or greater of the driver.

Original Question

I wrote a program to monitor changes to a MongoDB collection using its new Change Stream feature, on Python 3. Here's the MCVE:

from asyncio import get_event_loop, CancelledError
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient

async def watch(collection):
    async with collection.watch([]) as stream:
        async for change in stream:
            print(change)

async def cleanup():
    task.cancel()

    with suppress(CancelledError):
        await task

if __name__ == '__main__':
    conn = AsyncIOMotorClient()
    loop = get_event_loop()
    task = loop.create_task(watch(conn.database.collection))  # Replace with a real collection.

    try:
        loop.run_forever()

    except KeyboardInterrupt:
        pass

    finally:
        loop.run_until_complete(cleanup())
        loop.shutdown_asyncgens()
        loop.close()

When I kill the program with CTRL+C, it raises three different exceptions.

^Cexception calling callback for <Future at 0x102efea58 state=finished raised InvalidStateError>
Traceback (most recent call last):
  File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1259, in _next
    change = self.delegate.next()
  File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/change_stream.py", line 79, in next
    change = self._cursor.next()
  File "/Users/viotti/motor/lib/python3.6/site-packages/pymongo/command_cursor.py", line 292, in next
    raise StopIteration
StopIteration

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/viotti/motor/lib/python3.6/site-packages/motor/core.py", line 1264, in _next
    future.set_exception(StopAsyncIteration())
asyncio.base_futures.InvalidStateError: invalid state

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 324, in _invoke_callbacks
    callback(self)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/futures.py", line 414, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 620, in call_soon_threadsafe
    self._check_closed()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Is there a way to make that program close silently?

I'm testing with Python 3.6.4, Motor 1.2 and pymongo 3.6.0 on macOS Sierra.

like image 778
Rafa Avatar asked Jan 05 '18 12:01

Rafa


1 Answers

I think your code is correct, problem on motor's side.

While investigating I found two problems:

  1. If at the moment you close loop connection is not established, you'll get exception calling callback for <Future error since loop closed before async callbacks done. It seems not to be related to async generators or streams, but to any motor usage.
  2. AgnosticChangeStream async iteration mechanism (_next function) is written without thinking of case when it cancelled. Try of setting exception to cancelled future leads to InvalidStateError.

This code demonstrates two problems and possible workarounds:

import types
import asyncio
from contextlib import suppress
from motor.motor_asyncio import AsyncIOMotorClient


async def test():
    while True:
        await asyncio.sleep(0.1)


async def cleanup(task):
    task.cancel()
    with suppress(asyncio.CancelledError):
        await task


def _next(self, future):
    try:
        if not self.delegate:
            self.delegate = self._collection.delegate.watch(**self._kwargs)

        change = self.delegate.next()
        self._framework.call_soon(self.get_io_loop(),
                                  future.set_result,
                                  change)
    except StopIteration:
        future.set_exception(StopAsyncIteration())
    except Exception as exc:

        # CASE 2:
        # Cancellation of async iteration (and future with it) happens immediately
        # and trying to set exception to cancelled future leads to InvalidStateError,
        # we should prevent it:
        if future.cancelled():
            return

        future.set_exception(exc)


async def watch(collection):
    async with collection.watch([]) as stream:

        # Patch stream to achieve CASE 2:
        stream._next = types.MethodType(_next, stream)

        async for change in stream:
            print(change)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    tmp = asyncio.ensure_future(test())  # Way to receive KeyboardInterrupt immediately.

    client = AsyncIOMotorClient()
    collection = client.test_database.test_collection
    task = asyncio.ensure_future(watch(collection))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        print('KeyboardInterrupt')
    finally:
        loop.run_until_complete(cleanup(tmp))
        loop.run_until_complete(cleanup(task))

        # CASE 1:
        # Looks like propagating KeyboardInterrupt doesn't affect motor's try
        # to establish connection to db and I didn't find a way to stop this manually.
        # We should keep event loop alive until we receive ServerSelectionTimeoutError
        # and motor would be able to execute it's asyncio callbacks:
        loop.run_until_complete(asyncio.sleep(client.server_selection_timeout))

        loop.shutdown_asyncgens()
        loop.close()

It finishes without warnings/exceptions (on my machine at least) due to added fixes.

I don't recommend you to use hacks above! It's only to demonstrate problem places and possible solutions. I'm not sure it does everything properly.

Instead I advice you to create issue at motor user group / Jira appending there your snippet and probably my answer and wait until bug would be fixed.

like image 107
Mikhail Gerasimov Avatar answered Oct 10 '22 03:10

Mikhail Gerasimov