Here is my Interval
definition:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
.Select(l => Observable.FromAsync(DoWork))
.Concat()
.Subscribe();
In the code above, I feed the IScheduler
in both Interval
& ObserveOn
from a SchedulerProvider
so that I can unit test faster (TestScheduler.AdvanceBy). Also, DoWork
is an async
method.
In my particular case, I want the DoWork
function to be called every 5 seconds. The issue here is that I want the 5 seconds to be the time between the end of DoWork
and the start of the other. So if DoWork
takes more than 5 seconds to execute, let's say 10 seconds, the first call would be at 5 seconds and the second call at 15 seconds.
Unfortunately, the following test proves it does not behave like that:
[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{
m_queryHelperMock
.Setup(x => x.CustomQueryAsync())
.Callback(() => Thread.Sleep(10000))
.Returns(Task.FromResult(new QueryCompletedEventData()))
.Verifiable()
;
var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
multiPluginStatusHelper.MillisecondsInterval = 5000;
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}
The DoWork
calls the CustomQueryAsync
and the test fails saying that is was called twice. It should only be called once because of the delay forced with .Callback(() => Thread.Sleep(1000))
.
What am I doing wrong here ?
My actual implementation comes from this example.
This problem comes up a lot, usually when polling some non-observable data source. When I come across it, I use a RepeatAfterDelay
operator I wrote a while back:
public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
var repeatSignal = Observable
.Empty<T>()
.Delay(delay, scheduler);
// when source finishes, wait for the specified
// delay, then repeat.
return source.Concat(repeatSignal).Repeat();
}
And this is how I use it:
// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
.Subscribe();
// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
.Subscribe();
Your problem is that your code is mixing lazy (Observable
) and non-lazy (Task) constructs. While your first Task
is executing the Interval
will fire again and create a new task in the Select
operator. If you want to avoid this behavior you need to wrap your Observable into a Defer
block:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
//I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
.Select(l => Observable.Defer(() => DoWork()))
.Concat()
.Subscribe();
The result of this is that each Observable
will only execute the deferred Task
when it is subscribed to, i.e. when the previous completes.
Notably this does have a problem if your producer is producing much faster than you can consume, it will begin to pile up and each your memory. As an alternative I would propose using this GenerateAsync
implementation:
public static IObservable<TOut> GenerateAsync<TResult, TOut>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TimeSpan> timeSelector,
Func<TResult, TOut> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TOut>(async obs => {
//You have to do your initial time delay here.
var init = await initialState();
return s.Schedule(init, timeSelector(init), async (state, recurse) =>
{
//Check if we are done
if (!condition(state))
{
obs.OnCompleted();
return;
}
//Process the result
obs.OnNext(resultSelector(state));
//Initiate the next request
state = await iterate(state);
//Recursively schedule again
recurse(state, timeSelector(state));
});
});
}
GenerateAsync(DoWork /*Initial state*/,
_ => true /*Forever*/,
_ => DoWork() /*Do your async task*/,
_ => TimeSpan.FromSeconds(5) /*Delay between events*/,
_ => _ /*Any transformations*/,
scheduler)
.Subscribe();
The above removes the issue of producer/consumer races, by not scheduling the next event until after the first one is done.
While @Brandon's solution is nice and clean I discovered that it blocks a thread to wait for the delay timer. Non-blocking alternative can look something like:
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With