I'm trying to make an IObservable<bool>
that returns true
if a UDP Message has been received in the last 5 seconds and if a timeout occurs, a false is returned.
So far I have this:
public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
var udp = BaseComms.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"))
.Select(s => true);
return Observable
.Timeout(udp, TimeSpan.FromSeconds(5))
.Catch(Observable.Return(false));
}
The issues with this are:-
true
or false
on state changes.I could use a Subject<T>
but I need to be careful to dispose of the UDPBaseStringListener
observable when there are no more subscribers.
Update
Every time I get a UDP message I would like it to return a true
. If I haven't received a UDP message in the last 5 seconds, I would like it to return a false
.
As pointed out by Bj0, the solution with BufferWithTime
will not return the data point as soon as it is received and the buffer timeout is not reset after receiving a data point.
With Rx Extensions 2.0, your can solve both problems with a new Buffer overload accepting both a timeout and a size:
static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
return BaseComms
.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"))
.Buffer(TimeSpan.FromSeconds(5), 1)
.Select(s => s.Count > 0)
.DistinctUntilChanged();
}
The problem with buffer is that the "timeout" interval doesn't get reset when you get a new value, the buffer windows are just slices of time (5s in this case) that follow each other. This means that, depending on when you receive your last value, you may have to wait for almost double the timeout value. This can also miss timeouts:
should timeout here
v
0s 5s 10s 15s
|x - x - x | x - - - - | - - - x -| ...
true true true
IObservable.Throttle, however, resets itself each time a new value comes in and only produces a value after the timespan has elapsed (the last incoming value). This can be used as a timeout and merged with the IObservable to insert "timeout" values into the stream:
var obs = BaseComms.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"));
return obs.Merge( obs.Throttle( TimeSpan.FromSeconds(5) )
.Select( x => false ) )
.DistinctUntilChanged();
A working LINQPad example:
var sub = new Subject<int>();
var script = sub.Timestamp()
.Merge( sub.Throttle(TimeSpan.FromSeconds(2)).Select( i => -1).Timestamp())
.Subscribe( x =>
{
x.Dump("val");
});
Thread.Sleep(1000);
sub.OnNext(1);
sub.OnNext(2);
Thread.Sleep(10000);
sub.OnNext(5);
A -1 is inserted into the stream after a 2s timeout.
I would suggest avoiding the use of Timeout
- it causes exceptions and coding with exceptions is bad.
Also, it seems to only make sense that your observable stops after one value. You might need to explain more as to what you want the behaviour to be.
My current solution to your problem is:
public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP)
{
return Observable.Create<bool>(o =>
{
var subject = new AsyncSubject<bool>();
return new CompositeDisposable(
Observable.Amb(
BaseComms
.UDPBaseStringListener(localEP)
.Where(msg => msg.Data.Contains("running"))
.Select(s => true),
Observable
.Timer(TimeSpan.FromMilliseconds(10.0))
.Select(_ => false)
).Take(1).Subscribe(subject), subject.Subscribe(o));
});
}
Does that help?
If you don't want the sequence to stop, just wrap it in Defer + Repeat:
Observable.Defer(() => GettingUDPMessages(endpoint)
.Repeat();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With