Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to unit test Observable.Sample()?

I have a class encapsulating Observable.Sample() such as:

class IntervalRequestScheduler
{
    private Subject<Action> _requests = new Subject<Action>();
    private IDisposable _observable;

    public IntervalRequestScheduler(TimeSpan requestLimit)
    {
        _observable = _requests.Sample(requestLimit)
                               .Subscribe(action => action());
    }

    public Task<T> ScheduleRequest<T>(Func<Task<T>> request)
    {
        var tcs = new TaskCompletionSource<T>();
        _requests.OnNext(async () =>
        {
            try
            {
                T result = await request();
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        return tcs.Task;
    }
}

How can I test it properly? All my attempts either exit prematurely or cause deadlocks.

like image 320
abatishchev Avatar asked Oct 04 '14 03:10

abatishchev


1 Answers

Controlling Time in Rx

To key to unit testing Rx is understanding how to control time with the TestScheduler. All time-based operators in the Rx libraries take an optional IScheduler parameter in order to let you do this. Your time-based operators should do this as well.

So the first thing we need to do is modify your IntervalRequestScheduler constructor to facilitate this:

public IntervalRequestScheduler(TimeSpan requestLimit,
                                // The scheduler is optional
                                IScheduler scheduler = null)
{
    // assign a default if necessary
    scheduler = scheduler ?? Scheduler.Default;

    // make sure to pass the scheduler in to `Sample`
    _observable = _requests.Sample(requestLimit, scheduler)
                            .Subscribe(action => action());
}

With this change in place, we can now control time!

Here's an example unit test that will call an IntervalRequestScheduler instance's ScheduleRequest method ten times - then advance time by the sample duration of one second and check that only one task has completed:

[Test]
public void ASingleTaskIsCompletedWhenTenAreScheduledWithinInterval()
{
    var scheduler = new TestScheduler();
    var sampleDuration = TimeSpan.FromSeconds(1);

    var intervalRequestScheduler = new IntervalRequestScheduler(sampleDuration,
                                                                scheduler);

    // use a helper method to create "requests"
    var taskFactories = Enumerable.Range(0, 10).Select(CreateRequest);

    // schedule the requests and collect the tasks into an array
    var tasks =
        (from tf in taskFactories
            select intervalRequestScheduler.ScheduleRequest(tf)).ToArray();

    // prove no tasks have completed
    var completedTasksCount = tasks.Count(t => t.IsCompleted);
    Assert.AreEqual(0, completedTasksCount);

    // this is the key - we advance time simulating a sampling period.
    scheduler.AdvanceBy(sampleDuration.Ticks);

    // now we see exactly one task has completed
    completedTasksCount = tasks.Count(t => t.IsCompleted);
    Assert.AreEqual(1, completedTasksCount);
}

// helper to create requests
public Func<Task<int>> CreateRequest(int result)
{
    return () => Task.Run(() => result);
}

Aside

I have until now just focussed on the question at hand - but I did want to add that the actual motivation for IntervalRequestScheduler is a little unclear and the code looks a bit messy. There are possibly better ways to achieve this without mixing wrapped Tasks and IObservables. Staying in the Rx world also makes it easier to make tests predictable by controlling the schedulers involved. In the above code, there is some nastiness I've glossed over because the task invocation is asynchronous and it's possible that the one started task may not actually have completed by the time you test it - so to be absolutely correct you need to get into the messy business of monitoring tasks and giving time for them to start and finish. But hopefully you can see that the TestScheduler avoids all this mess on the Rx side.

If you want to constrain the number of jobs run to a certain rate, why not just sample the input and project the output?

For example - say you hand a request function of type Func<int,int> called runRequest and anIObservable<int> requests input stream providing the inputs each request (could be a Subject<int> for example). Then you could just have:

requests.Sample(TimeSpan.FromSeconds(1), scheduler)
        .Select(input => request(input))
        .Subscribe(result => /* DoSomethingWithResult */);

No idea if this works for your scenario of course, but it may provoke some ideas!

like image 198
James World Avatar answered Nov 18 '22 15:11

James World