I have used Observable.Using with methods that return an IDisposable in the way:
Observable.Using(() => new Stream(), s => DoSomething(s));
But how would we proceed when the stream is created asynchronously? Like in this:
Observable.Using(async () => await CreateStream(), s => DoSomething(s));
async Task<Stream> CreateStream()
{
...
}
DoSomething(Stream s)
{
...
}
This doesn't compile because it says that s
is Task<Stream>
instead a Stream
.
What's the deal?
Let's take a look at the source for the async overload of Observable.Using
:
Observable.FromAsync(resourceFactoryAsync)
.SelectMany(resource => Observable.Using(() => resource, r =>
Observable.FromAsync(ct => observableFactoryAsync(r, ct)).Merge()));
Knowing that it's just using the synchronous version under the hood, you can do something like this to adapt it to your usage:
Observable.FromAsync(CreateStream)
.SelectMany(stream => Observable.Using(() => stream, DoSomething));
It's unfortunate an overload for this isn't included, but you could always create your own:
public static class ObservableEx
{
public static IObservable<TSource> Using<TSource, TResource>(
Func<Task<TResource>> resourceFactoryAsync,
Func<TResource, IObservable<TSource>> observableFactory)
where TResource : IDisposable =>
Observable.FromAsync(resourceFactoryAsync).SelectMany(
resource => Observable.Using(() => resource, observableFactory));
}
and then it's as simple as:
ObservableEx.Using(CreateStream, DoSomething);
All of this is assuming that DoSomething
returns an observable, which is not mentioned in your question but is required by the contract of Observable.Using
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With