Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I notify RxPY observers on separate threads using asyncio?

Tags:

(Note: The background for this problem is pretty verbose, but there's an SSCCE at the bottom that can be skipped to)

Background

I'm trying to develop a Python-based CLI to interact with a web service. In my codebase I have a CommunicationService class that handles all direct communication with the web service. It exposes a received_response property that returns an Observable (from RxPY) that other objects can subscribe to in order to be notified when responses are received back from the web service.

I've based my CLI logic on the click library, where one of my subcommands is implemented as below:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)
    if response_handler is None:
        return None

    while True:
        response = await self.on_response
        success, value = response_handler(response)
        print(success, value)
        if success:
            return value

What's happening here (in the case that response_handler is not None) is that the subcommand is behaving as a coroutine that awaits responses from the web service (self.on_response == CommunicationService.received_response) and returns some processed value from the first response it can handle.

I'm trying to test the behaviour of my CLI by creating test cases in which CommunicationService is completely mocked; a fake Subject is created (which can act as an Observable) and CommunicationService.received_response is mocked to return it. As part of the test, the subject's on_next method is invoked to pass mock web service responses back to the production code:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    context.mock_received_response_subject.on_next(context.text)

I use a click 'result callback' function that gets invoked at the end of the CLI invocation and blocks until the coroutine (the subcommand) is done:

@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
    if task:
        loop = asyncio.get_event_loop()
        result = loop.run_until_complete(task)
        loop.close()
        print('RESULT:', result) 

Problem

At the start of the test, I run CliRunner.invoke to fire off the whole shebang. The problem is that this is a blocking call and will block the thread until the CLI has finished and returned a result, which isn't helpful if I need my test thread to carry on so it can produce mock web service responses concurrently with it.

What I guess I need to do is run CliRunner.invoke on a new thread using ThreadPoolExecutor. This allows the test logic to continue on the original thread and execute the @when step posted above. However, notifications published with mock_received_response_subject.on_next do not seem to trigger execution to continue within the subcommand.

I believe the solution would involve making use of RxPY's AsyncIOScheduler, but I'm finding the documentation on this a little sparse and unhelpful.

SSCCE

The snippet below captures what I hope is the essence of the problem. If it can be modified to work, I should be able to apply the same solution to my actual code to get it to behave as I want.

import asyncio
import logging
import sys
import time

import click
from click.testing import CliRunner
from rx.subjects import Subject

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()

thread_loop = asyncio.new_event_loop()


@click.group()
def cli():
    asyncio.set_event_loop(thread_loop)


@cli.resultcallback()
def result_handler(task, **_):
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(task) # Should block until subject publishes value
    loop.close()

    print(result)


@cli.command()
async def get_web_response():
    return await web_response_observable


def test():
    runner = CliRunner()
    future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
    time.sleep(1)
    web_response_subject.on_next('foo') # Simulate reception of web response.
    time.sleep(1)
    result = future.result()
    print(result.output)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(threadName)10s %(name)18s: %(message)s',
    stream=sys.stderr,
)

test()

Current Behaviour

The program hangs when run, blocking at result = loop.run_until_complete(task).

Acceptance Criteria

The program terminates and prints foo on stdout.

Update 1

Based on Vincent's help I've made some changes to my code.

Relay.enabled (the subcommand that awaits responses from the web service in order to process them) is now implemented like this:

async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
    self._generate_request(request)

    if response_handler is None:
        return None

    return await self.on_response \
        .select(response_handler) \
        .where(lambda result, i: result[0]) \
        .select(lambda result, index: result[1]) \
        .first()

I wasn't quite sure how await would behave with RxPY observables - would they return execution to the caller on each element generated, or only when the observable has completed (or errored?). I now know it's the latter, which honestly feels like the more natural choice and has allowed me to make the implementation of this function feel a lot more elegant and reactive.

I've also modified the test step that generates mock web service responses:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = asyncio.get_event_loop()
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

Unfortunately, this will not work as it stands, since the CLI is being invoked in its own thread...

@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
    loop = asyncio.get_event_loop()
    if 'async.cli' in context.tags:
        context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
    else:
        ...

And the CLI creates its own thread-private event loop when invoked...

def cli(context, hostname, port):
    _initialize_logging(context.meta['click_log.core.logger']['level'])

    # Create a new event loop for processing commands asynchronously on.
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    ...

What I think I need is a way to allow my test steps to invoke the CLI on a new thread and then fetch the event loop it's using:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = _get_cli_event_loop() # Needs to be implemented.
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

Update 2

There doesn't seem to be an easy way to get the event loop that a particular thread creates and uses for itself, so instead I took Victor's advice and mocked asyncio.new_event_loop to return an event loop that my test code creates and stores:

def _apply_mock_event_loop_patch(context):
    # Close any already-existing exit stacks.
    if hasattr(context, 'mock_event_loop_exit_stack'):
        context.mock_event_loop_exit_stack.close()

    context.test_loop = asyncio.new_event_loop()
    print(context.test_loop)
    context.mock_event_loop_exit_stack = ExitStack()
    context.mock_event_loop_exit_stack.enter_context(
        patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))

I change my 'mock web response received' test step to do the following:

@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
    loop = context.test_loop
    loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)

The great news is that I'm actually getting the Relay.enabled coroutine to trigger when this step gets executed!

The only problem now is the final test step in which I await the future I got from executing the CLI in its own thread and validate that the CLI is sending this on stdout:

@then('the CLI should print "{output}"')
def step_impl(context, output):
    if 'async.cli' in context.tags:
        loop = asyncio.get_event_loop() # main loop, not test loop
        result = loop.run_until_complete(context.async_result)
    else:
        result = context.result
    assert_that(result.output, equal_to(output))

I've tried playing around with this but I can't seem to get context.async_result (which stores the future from loop.run_in_executor) to transition nicely to done and return the result. With the current implementation, I get an error for the first test (1.1) and indefinite hanging for the second (1.2):

 @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.1                           # testcube/tests/features/relay.feature:45
    When the user queries the enable state of relay 0                             # testcube/tests/features/steps/relay.py:17 0.003s
    Then the CLI should query the web service about the enable state of relay 0   # testcube/tests/features/steps/relay.py:48 0.000s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[0].enabled','data':[True]}'
      """
    Then the CLI should print "True"                                              # testcube/tests/features/steps/core.py:94 0.003s
      Traceback (most recent call last):
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
          match.run(runner.context)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
          self.func(context, *args, **kwargs)
        File "testcube/tests/features/steps/core.py", line 99, in step_impl
          result = loop.run_until_complete(context.async_result)
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
          return future.result()
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
          raise self._exception
        File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
          result = self.fn(*self.args, **self.kwargs)
        File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
          output = out.getvalue()
      ValueError: I/O operation on closed file.

      Captured stdout:
      RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
      <Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>

  @mock.comms @async.cli @wip
  Scenario Outline: Querying relay enable state -- @1.2                           # testcube/tests/features/relay.feature:46
    When the user queries the enable state of relay 1                             # testcube/tests/features/steps/relay.py:17 0.005s
    Then the CLI should query the web service about the enable state of relay 1   # testcube/tests/features/steps/relay.py:48 0.001s
    When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
      """
      {'module':'relays','path':'relays[1].enabled','data':[False]}'
      """
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
    Then the CLI should print "False"                                             # testcube/tests/features/steps/core.py:94

Chapter 3: Finale

Screw all this asynchronous multi-threaded stuff, I'm too dumb for it.

First off, instead of describing the scenario like this...

When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
Then the CLI should print "<relay_enabled>"

We describe it like this:

Given the communications service will respond to requests:
  """
  {"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
  """
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"

Implement the new given step:

@given('the communications service will respond to requests')
def step_impl(context):
    response = context.text

    def publish_mock_response(_):
        loop = context.test_loop
        loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)

    # Configure the mock comms service to publish a mock response when a request is made.
    instance = context.mock_comms.return_value
    instance.send_request.on_next.side_effect = publish_mock_response

BOOM

2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s
like image 891
Tagc Avatar asked Sep 04 '16 15:09

Tagc


1 Answers

I can see two problems with your code:

  • asyncio is not thread-safe, unless you use call_soon_threadsafe or run_coroutine_threadsafe. RxPy doesn't use any of those in Observable.to_future, so you have to access RxPy objects in the same thread that runs the asyncio event loop.
  • RxPy sets the result of the future when on_completed is called, so that awaiting for an observable returns the last object emitted. This means you have to call both on_next and on_completed to get await to return.

Here is a working example:

import click
import asyncio
from rx.subjects import Subject
from click.testing import CliRunner

web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
main_loop = asyncio.get_event_loop()

@click.group()
def cli():
    pass

@cli.resultcallback()
def result_handler(task, **_):
    future = asyncio.run_coroutine_threadsafe(task, main_loop)
    print(future.result())

@cli.command()
async def get_web_response():
    return await web_response_observable

def test():
    runner = CliRunner()
    future = main_loop.run_in_executor(
        None, runner.invoke, cli, ['get_web_response'])
    main_loop.call_later(1, web_response_subject.on_next, 'foo')
    main_loop.call_later(2, web_response_subject.on_completed)
    result = main_loop.run_until_complete(future)
    print(result.output, end='')

if __name__ == '__main__':
    test()
like image 111
Vincent Avatar answered Sep 25 '22 16:09

Vincent