I'm struggling to get my head around why the following test does not work:
[Fact]
public void repro()
{
var scheduler = new TestScheduler();
var count = 0;
// this observable is a simplification of the system under test
// I've just included it directly in the test for clarity
// in reality it is NOT accessible from the test code - it is
// an implementation detail of the system under test
// but by passing in a TestScheduler to the sut, the test code
// can theoretically control the execution of the pipeline
// but per this question, that doesn't work when using FromAsync
Observable
.Return(1)
.Select(i => Observable.FromAsync(Whatever))
.Concat()
.ObserveOn(scheduler)
.Subscribe(_ => Interlocked.Increment(ref count));
Assert.Equal(0, count);
// this call initiates the observable pipeline, but does not
// wait until the entire pipeline has been executed before
// returning control to the caller
// the question is: why? Rx knows I'm instigating an async task
// as part of the pipeline (that's the point of the FromAsync
// method), so why can't it still treat the pipeline atomically
// when I call Start() on the scheduler?
scheduler.Start();
// count is still zero at this point
Assert.Equal(1, count);
}
private async Task<Unit> Whatever()
{
await Task.Delay(100);
return Unit.Default;
}
What I'm trying to do is run some asynchronous code (represented above by Whatever()
) whenever an observable ticks. Importantly, I want those calls to be queued. More importantly, I want to be able to control the execution of the pipeline by using the TestScheduler
.
It seems like the call to scheduler.Start()
is instigating the execution of Whatever()
but it isn't waiting until it completes. If I change Whatever()
so that it is synchronous:
private async Task<Unit> Whatever()
{
//await Task.Delay(100);
return Unit.Default;
}
then the test passes, but of course that defeats the purpose of what I'm trying to achieve. I could imagine there being a StartAsync()
method on the TestScheduler
that I could await, but that does not exist.
Can anyone tell me whether there's a way for me to instigate the execution of the reactive pipeline and wait for its completion even when it contains asynchronous calls?
Let me boil down your question to its essentials:
Is there a way, using the
TestScheduler
, to execute a reactive pipeline and wait for its completion even when it contains asynchronous calls?
I should warn you up front, there is no quick and easy answer here, no convenient "trick" that can be deployed.
To answer this question I think we need to clarify some points. The term "asynchronous call" in the question above seems to be used specifically to refer to methods with a Task
or Task<T>
signature - i.e. methods that use the Task Parallel Library (TPL) to run asynchronously.
This is important to note because Reactive Extensions (Rx) takes a different approach to handling asynchronous operations.
In Rx the introduction of concurrency is managed via a scheduler, a type implementing the IScheduler
interface. Any operation that introduces concurrency should make a available a scheduler parameter so that the caller can decide an appropriate scheduler. The core library slavishly adheres to this principle. So, for example, Delay
allows specification of a scheduler but Where
does not.
As you can see from the source, IScheduler
provides a number of Schedule
overloads. Operations requiring concurrency use these to schedule execution of work. Exactly how that work is executed is deferred completely to the scheduler. This is the power of the scheduler abstraction.
Rx operations introducing concurrency generally provide overloads that allow the scheduler to be omitted, and in that case select a sensible default. This is important to note, because if you want your code to be testable via the use of TestScheduler
you must use a TestScheduler
for all operations that introduce concurrency. A rogue method that doesn't allow this, could scupper your testing efforts.
The TPL has it's own abstraction to handle concurrency: The TaskScheduler
. The idea is very similar. You can read about it here..
There are two very important differences between the two abstractions:
Now
property. TPL schedulers do not.TaskSchedulers
to a method introducing concurrency (returning a Task
or Task<T>
). The vast majority of Task
returning methods assume use of the default TaskScheduler
and give you no choice about where work is run. The motivation to use a TestScheduler
is generally two-fold:
The way this works depends entirely on the fact that schedulers have their own notion of time. Every time an operation is scheduled via an IScheduler
, we specify when it must execute - either as soon as possible, or at a specific time in the future. The scheduler then queues work for execution and will execute it when the specified time (according to the scheduler itself) is reached.
When you call Start
on the TestScheduler
, it works by emptying the queue of all operations with execution times at or before its current notion of Now
- and then advancing its clock to the next scheduled work time and repeating until its queue is empty.
This allows neat tricks like being able to test that an operation will never result in an event! If using real time this would be a challenging task, but with virtual time it's easy - once the scheduler queue is completely empty, then the TestScheduler
concludes that no further events will ever happen - since if nothing is left on its queue, there is nothing there to schedule further tasks. In fact Start
returns at this precisely this point. For this to work, clearly all concurrent operations to be measured must be scheduled on the TestScheduler
.
A custom operator that carelessly makes its own choice of scheduler without allowing that choice to be overriden, or an operation that uses its own form of concurrency without a notion of time (such as TPL based calls) will make it difficult, if not impossible, to control execution via a TestScheduler
.
If you have an asynchronous operation run by other means, judicious use of the AdvanceTo
and AdvanceBy
methods of the TestScheduler
can allow you to coordinate with that foreign source of concurrency - but the extent to which this is achievable depends on the control afforded by that foreign source.
In the case of the TPL, you do know when a task is done - which does allow the use of waits and timeouts in tests, as ugly as these can be. Through the use of TaskCompleteSources
(TCS) you can mock tasks and use AdvanceTo
to hit specific points and complete TCSs, but there is no one simple approach here. Often you just have to resort to ugly waits and timeouts because you don't have sufficient control over foreign concurrency.
Rx is generally free-threaded and tries to avoid introducing concurrency wherever possible. Conversely, it's quite possible that different operations within an Rx call chain will need different types of scheduler abstraction. It's not always possible to simulate a call chain with a single test scheduler. Certainly, I have had cause to use multiple TestSchedulers
to simulate some complex scenarios - e.g. chains that use the DispatcherScheduler
and TaskScheduler
sometimes need complex coordination that means you can't simply serialize their operations on to one TestScheduler
.
Some projects I have worked on have mandated the use of Rx for all concurrency specifically to avoid these problems. That is not always feasible, and even in these cases, some use of TPL is generally inevitable.
One particular pain point of Rx that leaves many testers scratching their heads, is the fact that the TPL -> Rx family of conversions introduce concurrency. e.g. ToObservable
, SelectMany
's overload accepting Task<T>
etc. don't provide overloads with a scheduler and insidiously force you off the TestScheduler
thread, even if mocking with TCS. For all the pain this causes in testing alone, I consider this a bug. You can read all about this here - dig through and you'll find Dave Sexton's proposed fix, which provides an overload for specifying a scheduler, and is under consideration for inclusion. You may want to look into that pull request.
If you can edit your code to use it, the following helper method might be of use. It converts a task to an observable that will run on the TestScheduler and complete at the correct virtual time.
It schedules work on the TestScheduler that is responsible for collecting the task result - at the virtual time we state the task should complete. The work itself blocks until the task result is available - allowing the TPL task to run for however long it takes, or until a real amount of specified time has passed in which case a TimeoutException
is thrown.
The effect of blocking the work means that the TestScheduler
won't advance its virtual time past the expected virtual completion time of the task until the task has actually completed. This way, the rest of the Rx chain can run in full-speed virtual time and we only wait on the TPL task, pausing the rest of the chain at the task completion virtual time whilst this happens.
Crucially, other concurrent Rx operations scheduled to run in between the start virtual time of the Task based operation and the stated end virtual time of the Task are not blocked and their virtual completion time will be unaffected.
So set duration
to the length of virtual time you want the task to appear to have taken. The result will then be collected at whatever the virtual time is when the task is started, plus the duration specified.
Set timeout
to the actual time you will allow the task to take. If it takes longer, a timeout exception is thrown:
public static IObservable<T> ToTestScheduledObseravble<T>(
this Task<T> task,
TestScheduler scheduler,
TimeSpan duration,
TimeSpan? timeout = null)
{
timeout = timeout ?? TimeSpan.FromSeconds(100);
var subject = Subject.Synchronize(new AsyncSubject<T>(), scheduler);
scheduler.Schedule<Task<T>>(task, duration,
(s, t) => {
if (!task.Wait(timeout.Value))
{
subject.OnError(
new TimeoutException(
"Task duration too long"));
}
else
{
switch (task.Status)
{
case TaskStatus.RanToCompletion:
subject.OnNext(task.Result);
subject.OnCompleted();
break;
case TaskStatus.Faulted:
subject.OnError(task.Exception.InnerException);
break;
case TaskStatus.Canceled:
subject.OnError(new TaskCanceledException(task));
break;
}
}
return Disposable.Empty;
});
return subject.AsObservable();
}
Usage in your code would be like this, and your assert will pass:
Observable
.Return(1)
.Select(i => Whatever().ToTestScheduledObseravble(
scheduler, TimeSpan.FromSeconds(1)))
.Concat()
.Subscribe(_ => Interlocked.Increment(ref count));
In summary, you haven't missed any convenient trick. You need to think about how Rx works, and how the TPL works and decide whether:
TestScheduler
altogetherTestScheduler
to bring some modicum of control over your tests.If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With