Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive - Combining Observable Interval with manual triggers

I have Observable.Interval(TimeSpan.FromSeconds(1)) and a subscriber that checks something in DB every time interval occurs. But sometimes when I do that check from DB i want to immediately perform another check(call that subscriber again because I know there is something in the queue).

I have already managed to achieve something similar by combining Interval with while inside the subscriber method:

 Observable              
.Interval(TimeSpan.FromSeconds(1))
.Sample(TimeSpan.FromSeconds(1)) //to avoid multiple 'stacked' intervals 
.Subscribe(RepeatAction);


private void RepeatAction(long _)
{
    bool wasSuccess;
    do
    {
        wasSuccess = CheckingInDB(); //Long operation
    } while (wasSuccess );
}

But is it possible to achieve that kind of behavior with pure reactive?

like image 737
Szymon Knop Avatar asked May 10 '16 07:05

Szymon Knop


1 Answers

Yes. It is possible.

First up though, there is a misunderstanding that you have with Rx.

If you run this code:

void Main()
{
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Sample(TimeSpan.FromSeconds(1.0))
        .Timestamp()
        .Subscribe(RepeatAction);
}

private void RepeatAction(Timestamped<long> _)
{
    Console.WriteLine(_.Timestamp);
    Thread.Sleep(10000);
}

You'll get this result:

2016/05/11 10:37:57 +00:00
2016/05/11 10:38:07 +00:00
2016/05/11 10:38:17 +00:00
2016/05/11 10:38:27 +00:00

You'll see that the steps between each value being produced is 10 seconds, not 1. The Interval operator simply ensures that the gap between each value is at least the duration of the TimeSpan, but if the observer takes longer then the duration becomes as long as each of the subscribers take. It doesn't queue up the values.

Another way of looking at it is that the .Sample(TimeSpan.FromSeconds(1)) does nothing as the .Interval(TimeSpan.FromSeconds(1.0)) ensures that the minimal gap between values is already 1 second.

Now, to solve the problem using purely Rx operators. Try this:

var query =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(_ =>
            Observable
                .While(
                    () => CheckingInDB(),
                    Observable.Return(Unit.Default)))
        .Switch();

This will try every second to check the database, but once it hits a value it quickly repeats the check until it doesn't. Then it waits 1 second and tries again.

like image 142
Enigmativity Avatar answered Oct 18 '22 22:10

Enigmativity