I have to query a database in a timely fashion to know the state of a legacy system. I've thought of wrapping the query around an Observable
, but I don't know the correct way to do it.
Basically, it will be the same query every 5 seconds. But I'm afraid I will have to face these problems:
Extra details:
SELECT
that returns a dataset with a list of status codes (working, faulted).I'm almost sure that the query should be executed in another thread, but I have no idea of how the observable should look like, ever having read Introduction to Rx by Lee Campbell.
This is a fairly classic case of using Rx to poll another system. Most people will use Observable.Interval
as their go-to operator, and for most it will be fine.
However you have specific requirements on timeouts and retry. In this case I think you are better off using a combination of operators:
Observable.Timer
to allow you to execute your query in a specified timeTimeout
to identify and database queries that have overrunToObservable()
to map your Task
results to an observable sequence.Retry
to allow you to recover after timeoutsRepeat
to allow you to continue after successful database queries. This will also keep that initial period/gap between the completion of the previous database query and the commencement of the next one.This working LINQPad snippet should show you the query works properly:
void Main()
{
var pollingPeriod = TimeSpan.FromSeconds(5);
var dbQueryTimeout = TimeSpan.FromSeconds(10);
//You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;
var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });
var query = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
query.StartWith("Seed")
.TimeInterval(scheduler) //Just to debug, print the timing gaps.
.Dump();
}
// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
//Oscillate the delay between 3 and 12 seconds
delay += delayModifier;
var timespan = TimeSpan.FromSeconds(delay);
if (delay < 4 || delay > 11)
delayModifier *= -1;
timespan.Dump("delay");
await Task.Delay(timespan);
return "Value";
}
The results look like:
Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606
The key part of the sample is....
var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => DatabaseQuery().ToObservable())
.Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
.Retry() //Loop on errors
.Repeat(); //Loop on success
EDIT: Here is a further explanation of how to arrive at this solution. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md
I think this is what you should do:
var query =
from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
from ds in Observable.Amb(
Observable.Start(() => /* Your DataSet query */),
Observable
.Timer(TimeSpan.FromSeconds(10.0))
.Select(_ => new DataSet("TimeOut")))
select ds;
This triggers a new query with an interval between executions of 5 seconds. It's not 5 seconds since the last one started, it's 5 seconds since the last one ended.
Then you try your query, but you .Amb
it with a timer that returns a special DataSet
after 10 seconds. If your query finishes before 10 seconds is up then it wins, but otherwise the special DataSet
is returned. The .Amb
operator is basically a "race" operator - the first observable to produce a value wins.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With