The scenario is as follows: a device communicating over is considered connected if it makes a callback to the server within a short period of time. I want to create a class that encapsulates the functionality of keeping track of this status. On call to the device, the timeout should be reset. On callback, the connection is confirmed, and the status should be set to true, if the callback times out, it should be set to false. But the next call should be able to reset the timeout again indifferent to the current status.
I was thinking to achieve this with RX using swith
and timeout
. But I don't know why it stops working.
public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds)))
.Switch()
.Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}
public void ConfirmConnected()
{
connected.OnNext(true);
}
public void SetPending()
{
pending.OnNext(true);
}
}
This is the "test case":
var c = new ConnectionStatus(default(CancellationToken));
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, NOT OK!
I assume that the timeout of the inner observable is also stopping the outer observable. As the outer =>
lambda is not called anymore. What is the correct way?
Thank you
Here's an alternative way to produce the stream of IsConnected
values without using .TimeOut
:
public class ConnectionStatus
{
private Subject<Unit> pending = new Subject<Unit>();
private Subject<Unit> connected = new Subject<Unit>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending
.Select(outer =>
Observable.Amb(
connected.Select(_ => true),
Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
.Switch()
.Subscribe(isConnected => IsConnected = isConnected, token);
}
public void ConfirmConnected()
{
connected.OnNext(Unit.Default);
}
public void SetPending()
{
pending.OnNext(Unit.Default);
}
}
The Observable.Amb
operator simply takes a value from whichever observable produces a value first - it's preferable to coding with exceptions.
The problem is that Timeout
essentially causes an Exception blowing up the Rx subscriptions. After the timeout is triggered (as you have it coded), no other notifications will be sent. Rx grammar is that you can have * OnNext
messages followed by either one OnCompleted
or one OnError
. After the OnError
from the Timeout
is sent, you'll see no more messages.
You need to have the Timeout message delivered via OnNext
messages instead of an OnError
message. In your old code, you turned any OnError
into a false, and any OnNext
into a true. Instead you need to embed the proper new IsConnected
value into OnNext
messages. Here's how to do that:
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(_ => connected
.Timeout(TimeSpan.FromSeconds(timeoutSeconds))
.Materialize()
.Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException)
? Notification.CreateOnNext(false)
: n)
.Dematerialize()
.Take(1)
)
.Switch()
.Subscribe(b => IsConnected = b, token);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With