Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Extensions Timeout that doesn't stop sequence?

I'm trying to make an IObservable<bool> that returns true if a UDP Message has been received in the last 5 seconds and if a timeout occurs, a false is returned.

So far I have this:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    var udp = BaseComms.UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Select(s => true);

    return Observable
        .Timeout(udp, TimeSpan.FromSeconds(5))
        .Catch(Observable.Return(false));
}

The issues with this are:-

  • Once a false is returned, the sequence stops
  • I only really need true or false on state changes.

I could use a Subject<T> but I need to be careful to dispose of the UDPBaseStringListener observable when there are no more subscribers.

Update

Every time I get a UDP message I would like it to return a true. If I haven't received a UDP message in the last 5 seconds, I would like it to return a false.

like image 473
Tim Avatar asked Nov 09 '11 11:11

Tim


4 Answers

As pointed out by Bj0, the solution with BufferWithTime will not return the data point as soon as it is received and the buffer timeout is not reset after receiving a data point.

With Rx Extensions 2.0, your can solve both problems with a new Buffer overload accepting both a timeout and a size:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return BaseComms
        .UDPBaseStringListener(localEP)
        .Where(msg => msg.Data.Contains("running"))
        .Buffer(TimeSpan.FromSeconds(5), 1)
        .Select(s => s.Count > 0)
        .DistinctUntilChanged();
}
like image 106
Sébastien Lorion Avatar answered Nov 11 '22 17:11

Sébastien Lorion


The problem with buffer is that the "timeout" interval doesn't get reset when you get a new value, the buffer windows are just slices of time (5s in this case) that follow each other. This means that, depending on when you receive your last value, you may have to wait for almost double the timeout value. This can also miss timeouts:

               should timeout here
                         v
0s         5s         10s        15s
|x - x - x | x - - - - | - - - x -| ...
          true        true       true

IObservable.Throttle, however, resets itself each time a new value comes in and only produces a value after the timespan has elapsed (the last incoming value). This can be used as a timeout and merged with the IObservable to insert "timeout" values into the stream:

var obs = BaseComms.UDPBaseStringListener(localEP)
            .Where(msg => msg.Data.Contains("running"));

return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) )
                        .Select( x => false ) )
            .DistinctUntilChanged();

A working LINQPad example:

var sub = new Subject<int>();

var script = sub.Timestamp()
    .Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp())
    .Subscribe( x => 
{
    x.Dump("val");
});


Thread.Sleep(1000);

sub.OnNext(1);
sub.OnNext(2);

Thread.Sleep(10000);

sub.OnNext(5);

A -1 is inserted into the stream after a 2s timeout.

like image 42
bj0 Avatar answered Nov 11 '22 17:11

bj0


I would suggest avoiding the use of Timeout - it causes exceptions and coding with exceptions is bad.

Also, it seems to only make sense that your observable stops after one value. You might need to explain more as to what you want the behaviour to be.

My current solution to your problem is:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
    return Observable.Create<bool>(o =>
    {
        var subject = new AsyncSubject<bool>();
        return new CompositeDisposable(
            Observable.Amb(
                BaseComms
                    .UDPBaseStringListener(localEP)
                    .Where(msg => msg.Data.Contains("running"))
                    .Select(s => true),
                Observable
                    .Timer(TimeSpan.FromMilliseconds(10.0))
                    .Select(_ => false)
            ).Take(1).Subscribe(subject), subject.Subscribe(o));
    });
}

Does that help?

like image 1
Enigmativity Avatar answered Nov 11 '22 16:11

Enigmativity


If you don't want the sequence to stop, just wrap it in Defer + Repeat:

Observable.Defer(() => GettingUDPMessages(endpoint)
    .Repeat();
like image 1
Ana Betts Avatar answered Nov 11 '22 15:11

Ana Betts