Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Subscribing to observable sequence with async function

I have an asnyc function that I want to invoke on every observation in an IObservable sequence, limiting delivery to one event at a time. The consumer expects no more than one message in flight; and this is also the RX contract, if I understand it correctly.

Consider this sample:

static void Main() {
  var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
  //var d = ob.Subscribe(async x => await Consume(x));  // Does not rate-limit.
  var d = ob.Subscribe(x => Consume(x).Wait());
  Thread.Sleep(10000);
  d.Dispose();
}

static async Task<Unit> Consume(long count) {
  Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
  await Task.Delay(750);
  Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
  return Unit.Default;
}

The Consume function fakes a 750 ms processing time, and ob produces events every 100 ms. The code above works, but calls task.Wait() on a random thread. If I instead subscribe as in the commented out line 3, then Consume is invoked at the same rate at which ob produces events (and I cannot even grok what overload of Subscribe I am using in this commented statement, so it is probably nonsense).

So how do I correctly deliver one event at a time from an observable sequence to an async function?

like image 519
kkm Avatar asked May 10 '16 04:05

kkm


People also ask

Is observable subscribe async?

A common misconception in Angular development is regarding whether observables are synchronous or asynchronous. A lot of (even experienced Angular developers) think that observables are async, but the truth is that they can be… Both synchronous and asynchronous.

Can async return observable?

That would work as well, yes.

Can we use await on observable?

To use await with Rxjs observables, we've to convert it to a promise first. To do that, we can use the firstValueFrom or lastValueFrom functions. firstValueFrom returns promise that resolves to the first value of an observable. lastValueFrom returns promise that resolves to the last value of an observable.

Should I use async await in angular?

Using Async/Await in Angular One of the best improvements in JavaScript is the Async/Await feature introduced in the ECMAScript 7. Basically, Async/Await works on top of Promise and allows you to write async code in a synchronous manner. It simplifies the code and makes the flow and logic more understandable.

What happens when an observer subscribe to an observable sequence?

As soon as the subscription happens, the values are sent to the observer. The onNext function then prints out the values. When an observer subscribes to an observable sequence, the subscribe method may be using asynchronous behavior behind the scenes depending on the operator.

Why is subscribe method asynchronous in Java?

When an observer subscribes to an observable sequence, the subscribe method may be using asynchronous behavior behind the scenes depending on the operator. Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes.

Why is the subscribe call asynchronous?

Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes. This will be covered in more details in the Using Schedulers topic.

Is it possible to pass an async method to subscribe?

You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void. In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results.


1 Answers

Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.

Instead, consider your async method to be a single value observable sequence that takes a value from another sequence. Now you can compose sequences, which is what Rx was designed to do.

Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.

The break down of his code is as follows.

//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
      //Project the event you receive, into the result of the async method
      .Select(l => Observable.FromAsync(() => asyncMethod(l)))
      //Ensure that the results are serialized
      .Concat()
      //do what you will here with the results of the async method calls
      .Subscribe();

In this scenario, you are creating implicit queues. In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting. Personally I prefer to make this explicit by putting data into a queue. Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.

This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers. There are many reasons that the guidance is to not put them in your subscriber, for example: 1. you break the error model 2. you are mixing async models (rx here, task there) 3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.

UPDATE

To illustrate the comment about breaking the error model here is an update of the OP sample.

void Main()
{
    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
    var d = ob.Subscribe(
        x => ConsumeThrows(x).Wait(),
        ex=> Console.WriteLine("I will not get hit"));

    Thread.Sleep(10000);
    d.Dispose();
}

static async Task<Unit> ConsumeThrows(long count)
{
    return await Task.FromException<Unit>(new Exception("some failure"));
    //this will have the same effect of bringing down the application.
    //throw new Exception("some failure");
}

Here we can see that if the OnNext handler was to throw, then we are not protected by our Rx OnError handler. The exception would be unhandled and most likely bring down the application.

like image 192
Lee Campbell Avatar answered Sep 28 '22 00:09

Lee Campbell