I'm using Reactive Extensions in combination with async/await to simplify my socket protocol implementation. There's some actions that have to be performed when specific message arrives (e.g. send 'pong' to each 'ping' -message) and there's also some method's where we have to wait for some specific responses asynchronously. The following example illustrates this:
private Subject<string> MessageReceived = new Subject<string>();
//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
MessageReceived.OnNext(message);
ProcessMessage(message);
}
public async Task<string> TestMethod()
{
var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
await SendMessage("ABC");
//some code...
//if response we are waiting for comes before next row, we miss it
return await expectedMessage;
}
TestMethod() sends "ABC" to the socket and continues when for example "DEF" is received (there might be some other messages before that).
This works almost, but there's a race condition. It seems that this code won't listen for messages until return await expectedMessage; And this is a problem, since sometimes the message arrives before that.
FirstOrDefaultAsync won't work here nicely: It doesn't subscribe until the await line, which leaves you with a race condition (as you point out). Here's how you can replace it:
var expectedMessage = MessageReceived
.Where(x => x.EndsWith("D") && x.EndsWith("F"))
.Take(1)
.Replay(1)
.RefCount();
using (var dummySubscription = expectedMessage.Subscribe(i => {}))
{
await SendMessage("ABC");
//Some code... goes here.
return await expectedMessage;
}
.Replay(1) makes sure that a new subscription gets the most recent entry, assuming one exists. It only works though if there's a subscriber listening, hence dummySubscription.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With