Given:
IObservable<T> src
async Task F(T){...}
.F
can only be called sequentially. So await F(x);await F(y);
is fine, but Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...});
is wrong, because F(x)
and F(y)
must not run concurrently.It is clear to me that await src.Do(F)
is wrong, because it would run F
concurrently.
My question is how to do it correctly?
Observable.Do
is for side effects only, not for sequential composition. SelectMany
may be what you want. As of Rx 2.0, there are overloads of SelectMany
that make it really easy to compose observables with Task<T>
. (Just be aware of the additional concurrency that may be introduced by these and similar Task/Observable coop operators.)
var q = from value in src
from _ in F(value.X).AsVoidAsync() // See helper definition below
from __ in F(value.Y).AsVoidAsync()
select value;
However, based on the fact that you specifically asked about the Do
operator, I suspect that src may contain more than one value and you don't want overlapping calls to F for each value either. In that case, consider that SelectMany
is actually just like Select->Merge
; therefore, what you probably want is Select->Concat
.
// using System.Reactive.Threading.Tasks
var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();
Don't forget to use Defer
since F(x)
is hot.
AsVoidAsync Extension:
IObservable<T>
requires a T, yet Task
represents void, thus Rx's conversion operators require us to get a Task<T>
from a Task
. I tend to use Rx's System.Reactive.Unit struct for T
:
public static class TaskExtensions
{
public static Task<Unit> AsVoidAsync(this Task task)
{
return task.ContinueWith(t =>
{
var tcs = new TaskCompletionSource<Unit>();
if (t.IsCanceled)
{
tcs.SetCanceled();
}
else if (t.IsFaulted)
{
tcs.SetException(t.Exception);
}
else
{
tcs.SetResult(Unit.Default);
}
return tcs.Task;
},
TaskContinuationOptions.ExecuteSynchronously)
.Unwrap();
}
}
Alternatively, you could always just call the specialized ToObservable method instead.
The easiest way is to use TPL Dataflow, which will throttle to a single concurrent invocation by default, even for asynchronous methods:
var block = new ActionBlock<T>(F);
src.Subscribe(block.AsObserver());
await block.Completion;
Alternatively, you could subscribe an asynchronous method that you throttle yourself:
var semaphore = new SemaphoreSlim(1);
src.Do(async x =>
{
await semaphore.WaitAsync();
try
{
await F(x);
}
finally
{
semaphore.Release();
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With