Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to call back async function from Rx Subscribe?

I would like to call back an async function within an Rx subscription.

E.g. like that:

public class Consumer {     private readonly Service _service = new Service();      public ReplaySubject<string> Results = new ReplaySubject<string>();      public void Trigger()     {         Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());     }      public Task RunAsync()     {         return _service.DoAsync();     } }  public class Service {     public async Task<string> DoAsync()     {         return await Task.Run(() => Do());     }      private static string Do()     {         Thread.Sleep(TimeSpan.FromMilliseconds(200));         throw new ArgumentException("invalid!");         return "foobar";     } }  [Test] public async Task Test() {     var sut = new Consumer();     sut.Trigger();     var result = await sut.Results.FirstAsync(); } 

What needs to be done, in order to catch the exception properly?

like image 732
Martin Komischke Avatar asked Apr 11 '14 08:04

Martin Komischke


People also ask

How do you call async method?

The simplest way to execute a method asynchronously is to start executing the method by calling the delegate's BeginInvoke method, do some work on the main thread, and then call the delegate's EndInvoke method. EndInvoke might block the calling thread because it does not return until the asynchronous call completes.

Is observable subscribe async?

A common misconception in Angular development is regarding whether observables are synchronous or asynchronous. A lot of (even experienced Angular developers) think that observables are async, but the truth is that they can be… Both synchronous and asynchronous.

Is subscribe asynchronous?

As you may know, subscriptions are used to handle async method call. Thus, the code inside the subscribe() method is executed only when the async method return its result (after a http call for instance). While waiting for the async response, the program continues and execute the following code.


2 Answers

Ana Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:

Observable.Interval(TimeSpan.FromSeconds(1))           .Select(l => Observable.FromAsync(asyncMethod))           .Concat()           .Subscribe(); 

Or:

Observable.Interval(TimeSpan.FromSeconds(1))           .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))           .Concat()           .Subscribe(); 
like image 116
reijerh Avatar answered Sep 22 '22 18:09

reijerh


Change this to:

Observable.Timer(TimeSpan.FromMilliseconds(100))     .SelectMany(async _ => await RunAsync())     .Subscribe(); 

Subscribe doesn't keep the async operation inside the Observable.

like image 29
Ana Betts Avatar answered Sep 19 '22 18:09

Ana Betts