Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to achieve sequence of timeouts with RX?

The scenario is as follows: a device communicating over is considered connected if it makes a callback to the server within a short period of time. I want to create a class that encapsulates the functionality of keeping track of this status. On call to the device, the timeout should be reset. On callback, the connection is confirmed, and the status should be set to true, if the callback times out, it should be set to false. But the next call should be able to reset the timeout again indifferent to the current status.

I was thinking to achieve this with RX using swith and timeout. But I don't know why it stops working.

public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();

public bool IsConnected { get; private set; }

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
        .Switch()
        .Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}

public void ConfirmConnected()
{
    connected.OnNext(true);
}

public void SetPending()
{
    pending.OnNext(true);
}
}

This is the "test case":

var c = new ConnectionStatus(default(CancellationToken));

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();   
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK!

I assume that the timeout of the inner observable is also stopping the outer observable. As the outer => lambda is not called anymore. What is the correct way?

Thank you

like image 201
ZorgoZ Avatar asked Jan 04 '23 04:01

ZorgoZ


2 Answers

Here's an alternative way to produce the stream of IsConnected values without using .TimeOut:

public class ConnectionStatus
{
    private Subject<Unit> pending = new Subject<Unit>();
    private Subject<Unit> connected = new Subject<Unit>();

    public bool IsConnected { get; private set; }

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
    {
        pending
            .Select(outer =>
                Observable.Amb(
                    connected.Select(_ => true),
                    Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
            .Switch()
            .Subscribe(isConnected => IsConnected = isConnected, token);
    }

    public void ConfirmConnected()
    {
        connected.OnNext(Unit.Default);
    }

    public void SetPending()
    {
        pending.OnNext(Unit.Default);
    }
}

The Observable.Amb operator simply takes a value from whichever observable produces a value first - it's preferable to coding with exceptions.

like image 141
Enigmativity Avatar answered Jan 11 '23 19:01

Enigmativity


The problem is that Timeout essentially causes an Exception blowing up the Rx subscriptions. After the timeout is triggered (as you have it coded), no other notifications will be sent. Rx grammar is that you can have * OnNext messages followed by either one OnCompleted or one OnError. After the OnError from the Timeout is sent, you'll see no more messages.

You need to have the Timeout message delivered via OnNext messages instead of an OnError message. In your old code, you turned any OnError into a false, and any OnNext into a true. Instead you need to embed the proper new IsConnected value into OnNext messages. Here's how to do that:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(_ => connected
            .Timeout(TimeSpan.FromSeconds(timeoutSeconds))
            .Materialize()
            .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
                ? Notification.CreateOnNext(false)
                : n)
            .Dematerialize()
            .Take(1)
        )
        .Switch()
        .Subscribe(b => IsConnected = b, token);
}
like image 25
Shlomo Avatar answered Jan 11 '23 19:01

Shlomo