Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive extension fixed Interval between async calls when call is longer than Interval length

Here is my Interval definition:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                .Select(l => Observable.FromAsync(DoWork))
                .Concat()
                .Subscribe();

In the code above, I feed the IScheduler in both Interval & ObserveOn from a SchedulerProvider so that I can unit test faster (TestScheduler.AdvanceBy). Also, DoWork is an async method.

In my particular case, I want the DoWork function to be called every 5 seconds. The issue here is that I want the 5 seconds to be the time between the end of DoWork and the start of the other. So if DoWork takes more than 5 seconds to execute, let's say 10 seconds, the first call would be at 5 seconds and the second call at 15 seconds.

Unfortunately, the following test proves it does not behave like that:

[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{    
    m_queryHelperMock
        .Setup(x => x.CustomQueryAsync())
        .Callback(() => Thread.Sleep(10000))
        .Returns(Task.FromResult(new QueryCompletedEventData()))
        .Verifiable()
    ;

    var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
    multiPluginStatusHelper.MillisecondsInterval = 5000;
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);

    m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}

The DoWork calls the CustomQueryAsync and the test fails saying that is was called twice. It should only be called once because of the delay forced with .Callback(() => Thread.Sleep(1000)).

What am I doing wrong here ?

My actual implementation comes from this example.

like image 292
Joel Bourbonnais Avatar asked Sep 30 '15 19:09

Joel Bourbonnais


3 Answers

This problem comes up a lot, usually when polling some non-observable data source. When I come across it, I use a RepeatAfterDelay operator I wrote a while back:

public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    var repeatSignal = Observable
        .Empty<T>()
        .Delay(delay, scheduler);

    // when source finishes, wait for the specified
    // delay, then repeat.
    return source.Concat(repeatSignal).Repeat();
}

And this is how I use it:

// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
    .FromAsync(DoWork)
    .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
    .Subscribe();

// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
    .Timer(TimeSpan.FromSeconds(5), scheduler)
    .SelectMany(_ => Observable
        .FromAsync(DoWork)
        .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
    .Subscribe();
like image 90
Brandon Avatar answered Oct 15 '22 13:10

Brandon


Your problem is that your code is mixing lazy (Observable) and non-lazy (Task) constructs. While your first Task is executing the Interval will fire again and create a new task in the Select operator. If you want to avoid this behavior you need to wrap your Observable into a Defer block:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                 //I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
                .Select(l => Observable.Defer(() => DoWork()))
                .Concat()
                .Subscribe();

The result of this is that each Observable will only execute the deferred Task when it is subscribed to, i.e. when the previous completes.

Notably this does have a problem if your producer is producing much faster than you can consume, it will begin to pile up and each your memory. As an alternative I would propose using this GenerateAsync implementation:

    public static IObservable<TOut> GenerateAsync<TResult, TOut>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TimeSpan> timeSelector,
    Func<TResult, TOut> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TOut>(async obs => {

    //You have to do your initial time delay here.
    var init = await initialState();
    return s.Schedule(init, timeSelector(init), async (state, recurse) => 
    {
      //Check if we are done
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      //Process the result
      obs.OnNext(resultSelector(state));

      //Initiate the next request
      state = await iterate(state);

      //Recursively schedule again
      recurse(state, timeSelector(state));

    });
  });
}

GenerateAsync(DoWork /*Initial state*/, 
              _ => true /*Forever*/, 
              _ => DoWork() /*Do your async task*/,
              _ => TimeSpan.FromSeconds(5) /*Delay between events*/, 
              _ => _ /*Any transformations*/, 
              scheduler)
.Subscribe();

The above removes the issue of producer/consumer races, by not scheduling the next event until after the first one is done.

like image 2
paulpdaniels Avatar answered Oct 15 '22 13:10

paulpdaniels


While @Brandon's solution is nice and clean I discovered that it blocks a thread to wait for the delay timer. Non-blocking alternative can look something like:

public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
    source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
like image 2
Andrii Litvinov Avatar answered Oct 15 '22 14:10

Andrii Litvinov