Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable.Using with async Task

I have used Observable.Using with methods that return an IDisposable in the way:

Observable.Using(() => new Stream(), s => DoSomething(s));

But how would we proceed when the stream is created asynchronously? Like in this:

Observable.Using(async () => await CreateStream(), s => DoSomething(s));

async Task<Stream> CreateStream() 
{
    ...
}

DoSomething(Stream s)
{
    ...
}

This doesn't compile because it says that s is Task<Stream> instead a Stream.

What's the deal?

like image 231
SuperJMN Avatar asked Sep 27 '17 08:09

SuperJMN


1 Answers

Let's take a look at the source for the async overload of Observable.Using:

Observable.FromAsync(resourceFactoryAsync)
    .SelectMany(resource => Observable.Using(() => resource, r =>
        Observable.FromAsync(ct => observableFactoryAsync(r, ct)).Merge()));

Knowing that it's just using the synchronous version under the hood, you can do something like this to adapt it to your usage:

Observable.FromAsync(CreateStream)
    .SelectMany(stream => Observable.Using(() => stream, DoSomething));

It's unfortunate an overload for this isn't included, but you could always create your own:

public static class ObservableEx
{
    public static IObservable<TSource> Using<TSource, TResource>(
        Func<Task<TResource>> resourceFactoryAsync,
        Func<TResource, IObservable<TSource>> observableFactory)
        where TResource : IDisposable =>
        Observable.FromAsync(resourceFactoryAsync).SelectMany(
            resource => Observable.Using(() => resource, observableFactory));
}

and then it's as simple as:

ObservableEx.Using(CreateStream, DoSomething);

All of this is assuming that DoSomething returns an observable, which is not mentioned in your question but is required by the contract of Observable.Using.

like image 170
Taylor Buchanan Avatar answered Sep 23 '22 14:09

Taylor Buchanan