I have a method that do some work asynchronously with use of observable. I would like to know what is the best way to make this method async, so that I will be able to await on it and do some work after observable completes.
My first try was to use await on observable.
public async Task DoWorkAsync()
{
var observable = Observable.Create<int>(o =>
{
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("OnNext");
o.OnNext(1);
o.OnError(new Exception("exception in observable logic"));
//o.OnCompleted();
});
return Disposable.Empty;
});
//observable = observable.Publish().RefCount();
observable.Subscribe(i => Console.WriteLine(i));
Console.WriteLine("Awaiting...");
await observable;
Console.WriteLine("After Awaiting...");
}
Depending on the scenario I had different issues with that approach (+/- means that this part of code is uncommented/commented):
+OnNext +OnCompleted -OnError -RefCount: OnNext was invoked 2 times (observable was subscribed 2 times). This is what I would like to avoid.
+OnNext +OnCompleted -OnError +RefCount: When I use RefCount() method the code works.
-OnNext +OnCompleted -OnError +RefCount: "Sequence contains no element" exception is thrown when my observable doesn't raise OnNext.
+OnNext -OnCompleted +OnError -RefCount: OnNext was invoked 2 times. Exception raised.
+OnNext -OnCompleted +OnError +RefCount: Hangs after displaying 1 (probably because it wants to return to thread that is awaited). We can make it work (and raise exception) by using SubscribeOn(ThreadPoolScheduler.Instance)
Anyway in case when observable is empty (no OnNext rised) we get exception even if OnError is not called and we don't have any exception inside observable logic. Thats why awaiting on observable is not good solution.
That is why I tried another solution using TaskCompletionSource
public async Task DoWorkAsync()
{
var observable = Observable.Create<int>(o =>
{
Task.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("OnNext");
o.OnNext(1);
o.OnError(new Exception("exception in observable logic"));
//o.OnCompleted();
});
return Disposable.Empty;
});
var tcs = new TaskCompletionSource<bool>();
observable.Subscribe(i => Console.WriteLine(i),
e =>
{
//tcs.TrySetException(e);
tcs.SetResult(false);
},
() => tcs.TrySetResult(true));
Console.WriteLine("Awaiting...");
await tcs.Task;
Console.WriteLine("After Awaiting...");
}
It works ok in all scenarios and in case of OnError is invoked we could either use tcs.SetResult(false) and don't have information about exception details in outside method or we could use tcs.TrySetException(e) and be able to catch the exception in the outside method.
Can you suggest me if there is some better/cleaner solution or my second solution is the way to go?
EDIT
So I would like to know if there is a better solution than my second solution that will:
EDIT:
If you remove the subscription you can do the following:
await observable.Do(i => Console.WriteLine(i)).LastOrDefaultAsync();
As for your arbitrary requirements... Not having multiple subscriptions for a cold observable makes sense; so you publish it. Refusing to use .Publish().Refcount()
doesn't make sense. I don't understand why you're rejecting a solution that solves your problem.
There's a lot there, but I'm assuming this is your main question:
Anyway in case when observable is empty (no OnNext rised) we get exception even if OnError is not called and we don't have any exception inside observable logic. Thats why awaiting on observable is not good solution.
await observable
is the same as await observable.LastAsync()
. So if there is no element, you get an exception. Imagine changing that statement to int result = await observable;
What should the value of result
be if there's no elements?
If you change await observable;
to await observable.LastOrDefaultAsync();
everything should run smoothly.
And yes, you should use .Publish().Refcount()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With