I have an asnyc
function that I want to invoke on every observation in an IObservable
sequence, limiting delivery to one event at a time. The consumer expects no more than one message in flight; and this is also the RX contract, if I understand it correctly.
Consider this sample:
static void Main() {
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
//var d = ob.Subscribe(async x => await Consume(x)); // Does not rate-limit.
var d = ob.Subscribe(x => Consume(x).Wait());
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> Consume(long count) {
Console.WriteLine($"Consuming {count} on thread {Thread.CurrentThread.ManagedThreadId}");
await Task.Delay(750);
Console.WriteLine($"Returning on thread {Thread.CurrentThread.ManagedThreadId}");
return Unit.Default;
}
The Consume
function fakes a 750 ms processing time, and ob
produces events every 100 ms. The code above works, but calls task.Wait()
on a random thread. If I instead subscribe as in the commented out line 3, then Consume
is invoked at the same rate at which ob
produces events (and I cannot even grok what overload of Subscribe
I am using in this commented statement, so it is probably nonsense).
So how do I correctly deliver one event at a time from an observable sequence to an async
function?
A common misconception in Angular development is regarding whether observables are synchronous or asynchronous. A lot of (even experienced Angular developers) think that observables are async, but the truth is that they can be… Both synchronous and asynchronous.
That would work as well, yes.
To use await with Rxjs observables, we've to convert it to a promise first. To do that, we can use the firstValueFrom or lastValueFrom functions. firstValueFrom returns promise that resolves to the first value of an observable. lastValueFrom returns promise that resolves to the last value of an observable.
Using Async/Await in Angular One of the best improvements in JavaScript is the Async/Await feature introduced in the ECMAScript 7. Basically, Async/Await works on top of Promise and allows you to write async code in a synchronous manner. It simplifies the code and makes the flow and logic more understandable.
As soon as the subscription happens, the values are sent to the observer. The onNext function then prints out the values. When an observer subscribes to an observable sequence, the subscribe method may be using asynchronous behavior behind the scenes depending on the operator.
When an observer subscribes to an observable sequence, the subscribe method may be using asynchronous behavior behind the scenes depending on the operator. Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes.
Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes. This will be covered in more details in the Using Schedulers topic.
You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void. In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results.
Subscribers are not supposed to be long running, and therefore there isn't support for executing long running async methods in the Subscribe handlers.
Instead, consider your async method to be a single value observable sequence that takes a value from another sequence. Now you can compose sequences, which is what Rx was designed to do.
Now that you have made that leap, you will probably have something like what @Reijher creates in Howto call back async function from rx subscribe?.
The break down of his code is as follows.
//The input sequence. Produces values potentially quicker than consumer
Observable.Interval(TimeSpan.FromSeconds(1))
//Project the event you receive, into the result of the async method
.Select(l => Observable.FromAsync(() => asyncMethod(l)))
//Ensure that the results are serialized
.Concat()
//do what you will here with the results of the async method calls
.Subscribe();
In this scenario, you are creating implicit queues. In any problem where the producer is faster than the consumer, a queue will need to be used to collect values while waiting. Personally I prefer to make this explicit by putting data into a queue. Alternatively you could explicitly use a Scheduler to signal that is the threading model that should be picking up the slack.
This seems to be a popular hurdle (executing async in a subscribe handler) for Rx newcomers. There are many reasons that the guidance is to not put them in your subscriber, for example: 1. you break the error model 2. you are mixing async models (rx here, task there) 3. subscribe is the consumer of a composition of async sequences. An async method is just a single value sequence, so by that view cant be the end of the sequence, it's result might be though.
UPDATE
To illustrate the comment about breaking the error model here is an update of the OP sample.
void Main()
{
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
var d = ob.Subscribe(
x => ConsumeThrows(x).Wait(),
ex=> Console.WriteLine("I will not get hit"));
Thread.Sleep(10000);
d.Dispose();
}
static async Task<Unit> ConsumeThrows(long count)
{
return await Task.FromException<Unit>(new Exception("some failure"));
//this will have the same effect of bringing down the application.
//throw new Exception("some failure");
}
Here we can see that if the OnNext
handler was to throw, then we are not protected by our Rx OnError
handler.
The exception would be unhandled and most likely bring down the application.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With