Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Database polling with Reactive Extensions

I have to query a database in a timely fashion to know the state of a legacy system. I've thought of wrapping the query around an Observable, but I don't know the correct way to do it.

Basically, it will be the same query every 5 seconds. But I'm afraid I will have to face these problems:

  • What if the execution of the query takes 10 seconds? I don't want to execute any new query if the previous is still being processed.
  • Also, there should be a timeout. If the current query doesn't execute after, for example, 20 seconds, an informative message should be logged and a new attempt (the same query) should be sent.

Extra details:

  • The query is just a SELECT that returns a dataset with a list of status codes (working, faulted).
  • The Observable sequence will always take the latest data received from the query, something like the Switch extension method.
  • I would like to wrap the database query (lenghty operation) into a Task, but I'm not sure if it's the best option.

I'm almost sure that the query should be executed in another thread, but I have no idea of how the observable should look like, ever having read Introduction to Rx by Lee Campbell.

like image 237
SuperJMN Avatar asked Nov 14 '15 23:11

SuperJMN


2 Answers

This is a fairly classic case of using Rx to poll another system. Most people will use Observable.Interval as their go-to operator, and for most it will be fine.

However you have specific requirements on timeouts and retry. In this case I think you are better off using a combination of operators:

  • Observable.Timer to allow you to execute your query in a specified time
  • Timeout to identify and database queries that have overrun
  • ToObservable() to map your Task results to an observable sequence.
  • Retry to allow you to recover after timeouts
  • Repeat to allow you to continue after successful database queries. This will also keep that initial period/gap between the completion of the previous database query and the commencement of the next one.

This working LINQPad snippet should show you the query works properly:

void Main()
{
    var pollingPeriod = TimeSpan.FromSeconds(5);
    var dbQueryTimeout = TimeSpan.FromSeconds(10);

    //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence.
    var rxQueryTimeOut = pollingPeriod + dbQueryTimeout;

    var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" });

    var query = Observable.Timer(pollingPeriod, scheduler)
                    .SelectMany(_ => DatabaseQuery().ToObservable())
                    .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                    .Retry()    //Loop on errors
                    .Repeat();  //Loop on success

    query.StartWith("Seed")
        .TimeInterval(scheduler)    //Just to debug, print the timing gaps.
        .Dump();
}

// Define other methods and classes here
private static int delay = 9;
private static int delayModifier = 1;
public async Task<string> DatabaseQuery()
{
    //Oscillate the delay between 3 and 12 seconds
    delay += delayModifier;
    var timespan = TimeSpan.FromSeconds(delay);
    if (delay < 4 || delay > 11)
        delayModifier *= -1;
    timespan.Dump("delay");
    await Task.Delay(timespan);
    return "Value";
}

The results look like:

Seed 00:00:00.0125407
Timeout 00:00:15.0166379
Timeout 00:00:15.0124480
Timeout 00:00:15.0004520
Timeout 00:00:15.0013296
Timeout 00:00:15.0140864
Value 00:00:14.0251731
Value 00:00:13.0231958
Value 00:00:12.0162236
Value 00:00:11.0138606

The key part of the sample is....

var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler)
                .SelectMany(_ => DatabaseQuery().ToObservable())
                .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler)
                .Retry()    //Loop on errors
                .Repeat();  //Loop on success

EDIT: Here is a further explanation of how to arrive at this solution. https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

like image 154
Lee Campbell Avatar answered Sep 20 '22 14:09

Lee Campbell


I think this is what you should do:

var query =
    from n in Observable.Interval(TimeSpan.FromSeconds(5.0))
    from ds in Observable.Amb(
        Observable.Start(() => /* Your DataSet query */),
        Observable
            .Timer(TimeSpan.FromSeconds(10.0))
            .Select(_ => new DataSet("TimeOut")))
    select ds;

This triggers a new query with an interval between executions of 5 seconds. It's not 5 seconds since the last one started, it's 5 seconds since the last one ended.

Then you try your query, but you .Amb it with a timer that returns a special DataSet after 10 seconds. If your query finishes before 10 seconds is up then it wins, but otherwise the special DataSet is returned. The .Amb operator is basically a "race" operator - the first observable to produce a value wins.

like image 31
Enigmativity Avatar answered Sep 20 '22 14:09

Enigmativity