I would like to call back an async function within an Rx subscription.
E.g. like that:
public class Consumer { private readonly Service _service = new Service(); public ReplaySubject<string> Results = new ReplaySubject<string>(); public void Trigger() { Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); } public Task RunAsync() { return _service.DoAsync(); } } public class Service { public async Task<string> DoAsync() { return await Task.Run(() => Do()); } private static string Do() { Thread.Sleep(TimeSpan.FromMilliseconds(200)); throw new ArgumentException("invalid!"); return "foobar"; } } [Test] public async Task Test() { var sut = new Consumer(); sut.Trigger(); var result = await sut.Results.FirstAsync(); }
What needs to be done, in order to catch the exception properly?
The simplest way to execute a method asynchronously is to start executing the method by calling the delegate's BeginInvoke method, do some work on the main thread, and then call the delegate's EndInvoke method. EndInvoke might block the calling thread because it does not return until the asynchronous call completes.
A common misconception in Angular development is regarding whether observables are synchronous or asynchronous. A lot of (even experienced Angular developers) think that observables are async, but the truth is that they can be… Both synchronous and asynchronous.
As you may know, subscriptions are used to handle async method call. Thus, the code inside the subscribe() method is executed only when the async method return its result (after a http call for instance). While waiting for the async response, the program continues and execute the following code.
Ana Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:
Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe();
Or:
Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();
Change this to:
Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(async _ => await RunAsync()) .Subscribe();
Subscribe doesn't keep the async operation inside the Observable.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With