I have a class encapsulating Observable.Sample()
such as:
class IntervalRequestScheduler
{
private Subject<Action> _requests = new Subject<Action>();
private IDisposable _observable;
public IntervalRequestScheduler(TimeSpan requestLimit)
{
_observable = _requests.Sample(requestLimit)
.Subscribe(action => action());
}
public Task<T> ScheduleRequest<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
}
How can I test it properly? All my attempts either exit prematurely or cause deadlocks.
To key to unit testing Rx is understanding how to control time with the TestScheduler
. All time-based operators in the Rx libraries take an optional IScheduler
parameter in order to let you do this. Your time-based operators should do this as well.
So the first thing we need to do is modify your IntervalRequestScheduler
constructor to facilitate this:
public IntervalRequestScheduler(TimeSpan requestLimit,
// The scheduler is optional
IScheduler scheduler = null)
{
// assign a default if necessary
scheduler = scheduler ?? Scheduler.Default;
// make sure to pass the scheduler in to `Sample`
_observable = _requests.Sample(requestLimit, scheduler)
.Subscribe(action => action());
}
With this change in place, we can now control time!
Here's an example unit test that will call an IntervalRequestScheduler
instance's ScheduleRequest
method ten times - then advance time by the sample duration of one second and check that only one task has completed:
[Test]
public void ASingleTaskIsCompletedWhenTenAreScheduledWithinInterval()
{
var scheduler = new TestScheduler();
var sampleDuration = TimeSpan.FromSeconds(1);
var intervalRequestScheduler = new IntervalRequestScheduler(sampleDuration,
scheduler);
// use a helper method to create "requests"
var taskFactories = Enumerable.Range(0, 10).Select(CreateRequest);
// schedule the requests and collect the tasks into an array
var tasks =
(from tf in taskFactories
select intervalRequestScheduler.ScheduleRequest(tf)).ToArray();
// prove no tasks have completed
var completedTasksCount = tasks.Count(t => t.IsCompleted);
Assert.AreEqual(0, completedTasksCount);
// this is the key - we advance time simulating a sampling period.
scheduler.AdvanceBy(sampleDuration.Ticks);
// now we see exactly one task has completed
completedTasksCount = tasks.Count(t => t.IsCompleted);
Assert.AreEqual(1, completedTasksCount);
}
// helper to create requests
public Func<Task<int>> CreateRequest(int result)
{
return () => Task.Run(() => result);
}
I have until now just focussed on the question at hand - but I did want to add that the actual motivation for IntervalRequestScheduler
is a little unclear and the code looks a bit messy. There are possibly better ways to achieve this without mixing wrapped Tasks and IObservables. Staying in the Rx world also makes it easier to make tests predictable by controlling the schedulers involved. In the above code, there is some nastiness I've glossed over because the task invocation is asynchronous and it's possible that the one started task may not actually have completed by the time you test it - so to be absolutely correct you need to get into the messy business of monitoring tasks and giving time for them to start and finish. But hopefully you can see that the TestScheduler avoids all this mess on the Rx side.
If you want to constrain the number of jobs run to a certain rate, why not just sample the input and project the output?
For example - say you hand a request function of type Func<int,int>
called runRequest
and anIObservable<int> requests
input stream providing the inputs each request (could be a Subject<int>
for example). Then you could just have:
requests.Sample(TimeSpan.FromSeconds(1), scheduler)
.Select(input => request(input))
.Subscribe(result => /* DoSomethingWithResult */);
No idea if this works for your scenario of course, but it may provoke some ideas!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With