I have a sequence of type IObservable<T>
and a function that maps T, CancellationToken
to a Task<U>
. What's the cleanest way of getting an IObservable<U>
out of them?
I need the following semantics:
Here's the signature as I see it:
public static IObservable<U> Select<T, U> (
this IObservable<T> source,
Func<T, CancellationToken, Task<U>> selector
);
I haven't written any code yet but I will unless someone beats me to it.
In any case, I'm not familiar with operators like Window
, so my solution will likely be less elegant.
I need the solution in C# 4, but C# 5 answers are also welcome for the sake of comparison.
If you're curious, below is my real-world scenario, more or less:
Dropbox.GetImagesRecursively ()
.ObserveOn (SynchronizationContext.Current)
.Select (DownloadImage)
.Subscribe (AddImageToFilePicker);
This seems to work for me so far:
public static IObservable<U> Select<T, U> (
this IObservable<T> source,
Func<T, CancellationToken, Task<U>> selector)
{
return source
.Select (item =>
Observable.Defer (() =>
Observable.StartAsync (ct => selector (item, ct))
.Catch (Observable.Empty<U> ())
))
.Concat ();
}
We map a deferred task-based exception-swallowing observable to each item, and then concat them.
My thought process went like this.
I noticed that one of the SelectMany
overloads does almost exactly what I wanted and even has exactly the same signature. It didn't satisfy my needs though:
I looked at this overload's implementation and noticed it uses FromAsync
to handle task creation and cancellation:
public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
return SelectMany_<TSource, TTaskResult, TResult> (
source,
x => FromAsync (ct => taskSelector (x, ct)),
resultSelector
);
}
I turned my eye to FromAsync
to see how it was implemented, and was pleasantly surprised to find it was composable as well:
public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
return Defer (() => StartAsync (functionAsync));
}
I reused Defer
and StartAsync
, while also adding Catch
to swallow errors. The combination of Defer
and Concat
ensures tasks wait for each other and start in the original order.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With