Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I create an Rx sequence by running tasks over original sequence's values?

I have a sequence of type IObservable<T> and a function that maps T, CancellationToken to a Task<U>. What's the cleanest way of getting an IObservable<U> out of them?

I need the following semantics:

  • each tasks starts after the previous item's task has finished
  • if a task has been cancelled or faulted, it is skipped
  • the order of the original sequence is strictly preserved

Here's the signature as I see it:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector
);

I haven't written any code yet but I will unless someone beats me to it.
In any case, I'm not familiar with operators like Window, so my solution will likely be less elegant.

I need the solution in C# 4, but C# 5 answers are also welcome for the sake of comparison.


If you're curious, below is my real-world scenario, more or less:

Dropbox.GetImagesRecursively ()
    .ObserveOn (SynchronizationContext.Current)
    .Select (DownloadImage)
    .Subscribe (AddImageToFilePicker);
like image 406
Dan Abramov Avatar asked Oct 21 '22 05:10

Dan Abramov


1 Answers

This seems to work for me so far:

public static IObservable<U> Select<T, U> (
    this IObservable<T> source,
    Func<T, CancellationToken, Task<U>> selector)
{
    return source
        .Select (item => 
            Observable.Defer (() => 
                Observable.StartAsync (ct => selector (item, ct))
                    .Catch (Observable.Empty<U> ())
            ))
        .Concat ();
}

We map a deferred task-based exception-swallowing observable to each item, and then concat them.


My thought process went like this.

I noticed that one of the SelectMany overloads does almost exactly what I wanted and even has exactly the same signature. It didn't satisfy my needs though:

  • it creates tasks as original items come up, whereas I needed to wait for each task to finish
  • it offers no option to skip canceled and faulted tasks

I looked at this overload's implementation and noticed it uses FromAsync to handle task creation and cancellation:

public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult> (IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
    return SelectMany_<TSource, TTaskResult, TResult> (
        source,
        x => FromAsync (ct => taskSelector (x, ct)),
        resultSelector
    );
}

I turned my eye to FromAsync to see how it was implemented, and was pleasantly surprised to find it was composable as well:

public virtual IObservable<TResult> FromAsync<TResult> (Func<CancellationToken, Task<TResult>> functionAsync)
{
    return Defer (() => StartAsync (functionAsync));
}

I reused Defer and StartAsync, while also adding Catch to swallow errors. The combination of Defer and Concat ensures tasks wait for each other and start in the original order.

like image 179
Dan Abramov Avatar answered Oct 31 '22 19:10

Dan Abramov