Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous message processing with async/await, RX and LINQ

I'm using Reactive Extensions in combination with async/await to simplify my socket protocol implementation. There's some actions that have to be performed when specific message arrives (e.g. send 'pong' to each 'ping' -message) and there's also some method's where we have to wait for some specific responses asynchronously. The following example illustrates this:

private Subject<string> MessageReceived = new Subject<string>();

//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
    MessageReceived.OnNext(message);
    ProcessMessage(message);
}

public async Task<string> TestMethod()
{
    var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
    await SendMessage("ABC");

    //some code...

    //if response we are waiting for comes before next row, we miss it
    return await expectedMessage;
}

TestMethod() sends "ABC" to the socket and continues when for example "DEF" is received (there might be some other messages before that).

This works almost, but there's a race condition. It seems that this code won't listen for messages until return await expectedMessage; And this is a problem, since sometimes the message arrives before that.

like image 253
Juha Avatar asked May 19 '26 14:05

Juha


1 Answers

FirstOrDefaultAsync won't work here nicely: It doesn't subscribe until the await line, which leaves you with a race condition (as you point out). Here's how you can replace it:

    var expectedMessage = MessageReceived
        .Where(x => x.EndsWith("D") && x.EndsWith("F"))
        .Take(1)
        .Replay(1)
        .RefCount();

    using (var dummySubscription = expectedMessage.Subscribe(i => {}))
    {
        await SendMessage("ABC");

        //Some code... goes here.

        return await expectedMessage;
    }

.Replay(1) makes sure that a new subscription gets the most recent entry, assuming one exists. It only works though if there's a subscriber listening, hence dummySubscription.

like image 57
Shlomo Avatar answered May 21 '26 03:05

Shlomo