Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I await a response from an RX Subject without introducing a race condition?

I have a service that allows a caller to send commands and receive responses asynchronously. In a real application, these actions are fairly disconnected (some action will send a command, and the responses will be process independently).

However, in my tests, I need to be able to send a command and then wait for the (first) response before continuing the test.

The responses are published using RX, and my first attempt at the code was something like this:

service.SendCommand("BLAH");
await service.Responses.FirstAsync();

The problem with this, is that FirstAsync will only work if the response arrives after this await has already been hit. If the service processes very quickly, then the test will hang on the await.

My next attempt to fix this was to call FirstAsync() prior to sending the command, so that it would have the result even if it arrived before awaiting:

var firstResponse = service.Responses.FirstAsync();
service.SendCommand("BLAH");
await firstResponse;

However, this still fails in the same way. It seems like it's only when the await is hit (GetAwaiter is called) that it starts listening; so the exact same race condition exists.

If I change my Subject to a ReplaySubject with a buffer (or timer) then I can "workaround" this; however it doesn't make sense to do that in my production classes; it would only be for testing.

What's the "correct" way to be able to do this in RX? How can I set up something that will receive the first event on a stream in a way that won't introduce a race condition?

Here's a small test that illustrates the issue in a "single-threaded" way. This test will hang indefintely:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    // Subscribe to the first bool (but don't await it yet)
    var firstBool = x.FirstAsync();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool; // <-- hangs here; presumably because firstBool didn't start monitoring until GetAwaiter was called?


    Assert.Equal(true, b);
}

I even tried calling Replay() in my test thinking it would buffer the results; but that doesn't change anything:

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.Replay();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool.FirstAsync(); // <-- Still hangs here


    Assert.Equal(true, b);
}
like image 842
Danny Tuppeny Avatar asked Jun 27 '14 18:06

Danny Tuppeny


2 Answers

Great question Danny. This troubles lots of people new to Rx.

FlagBug has an acceptable answer above, but it could have been even easier to have just added the single line

var firstBool = x.Replay();
firstBool.Connect();   //Add this line, else your IConnectableObservable will never connect!

This style of testing is ok. But there is another way, which in my experience is what people move to once they have used Rx for a bit longer. I suggest you just go straight to this version! But lets get there slowly...

(please excuse the switch back to NUnit as I don't have an xUnit runner on this PC)

Here we simply just add values to a List<T> as the are produced. We then can just check the contents of the list in our asserts:

[Test]
public void MyTest_with_List()
{
    var messages = new List<bool>();
    var x = new Subject<bool>();

    x.Subscribe(messages.Add);

    // Send the first bool
    x.OnNext(true);

    Assert.AreEqual(true, messages.Single());
}

For these super simple tests, that is Ok, but we miss some fidelity around sequence termination i.e. did it complete or error?

We can further extend this style of testing by using the testing tools for Rx (Rx-testing Nuget). In this test we use the MockObserver/ITestableObserver<T> that we (annoyingly) get from a TestScheduler instance. Note I have made the test fixture/class extend ReactiveTest

[TestCase(true)]
[TestCase(false)]
public void MyTest_with_TestObservers(bool expected)
{
    var observer = new TestScheduler().CreateObserver<bool>();
    var x = new Subject<bool>();

    x.Subscribe(observer);

    x.OnNext(expected);

    observer.Messages.AssertEqual(
        OnNext(0, expected));
}

This may seem like a small improvement, or even arguably a step backwards with the need for creating test schedulers, and specifying expected times we see messages. However as soon as you start introducing more complex Rx tests, this becomes very valuable.

You could further extend the test to even generate your source sequence upfront and specify when the values will be played in virtual time. Here we drop the usage of the subject and specify that in 1000ticks we will publish a value (expected). In the assertion, we again check the value and also the time the value was received. As we are now introducing Virtual time we also need to say when we want time to advance. We do that here by calling testScheduler.Start();

[TestCase(true)]
[TestCase(false)]
public void MyTest_with_TestObservables(bool expected)
{
    var testScheduler = new TestScheduler();
    var observer = testScheduler.CreateObserver<bool>();
    var source = testScheduler.CreateColdObservable(
        OnNext(1000, expected));

    source.Subscribe(observer);
    testScheduler.Start();

    observer.Messages.AssertEqual(
        OnNext(1000, expected));
}

I have written more about testing Rx at here

like image 55
Lee Campbell Avatar answered Jan 04 '23 23:01

Lee Campbell


You can do this with an AsyncSubject

[Fact]
public async Task MyTest()
{
    var x = new Subject<bool>();

    var firstBool = x.FirstAsync().PublishLast(); // PublishLast wraps an AsyncSubject
    firstBool.Connect();

    // Send the first bool
    x.OnNext(true);

    // Await the task that receives the first bool
    var b = await firstBool;


    Assert.Equal(true, b);
}

AsyncSubject basically caches the last received value before OnComplete is called and then replays it.

like image 33
Flagbug Avatar answered Jan 05 '23 01:01

Flagbug