Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Subscribe with async method in Rx?

I have following code:

IObservable<Data> _source;

...

_source.Subscribe(StoreToDatabase);

private async Task StoreToDatabase(Data data) {
    await dbstuff(data);
}

However, this does not compile. Is there any way how to observe data asynchronously? I tried async void, it works, but I feel that given solution is not feasible.

I also checked Reactive Extensions Subscribe calling await, but it does not answer my question (I do not care about the SelectMany result.)

like image 597
nothrow Avatar asked May 24 '16 11:05

nothrow


People also ask

Is observable subscribe async?

An observable produces values over time. An array is created as a static set of values. In a sense, observables are asynchronous where arrays are synchronous. In the following examples, → implies asynchronous value delivery.

Can we use await with subscribe in angular?

With asyn-await pattern, we do not need neither “subscribe” nor “toPromise”. Code looks very simple and obvious.

How does async method work?

An async method runs synchronously until it reaches its first await expression, at which point the method is suspended until the awaited task is complete. In the meantime, control returns to the caller of the method, as the example in the next section shows.

What happens when you call async method?

The call to the async method starts an asynchronous task. However, because no Await operator is applied, the program continues without waiting for the task to complete. In most cases, that behavior isn't expected.


2 Answers

You don't have to care about the SelectMany result. The answer is still the same... though you need your task to have a return type (i.e. Task<T>, not Task).

Unit is essentially equivalent to void, so you can use that:

_source.SelectMany(StoreToDatabase).Subscribe();

private async Task<Unit> StoreToDatabase(Data data)
{
    await dbstuff(data);
    return Unit.Default;
}

This SelectMany overload accepts a Func<TSource, Task<TResult> meaning the resulting sequence will not complete until the task is completed.

like image 93
Charles Mager Avatar answered Oct 01 '22 11:10

Charles Mager


Late answer, but I think that the following extension methods correctly encapsulate what Charles Mager proposed in his answer:

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<Task> asyncAction, Action<Exception> handler = null)
{
    Func<T,Task<Unit>> wrapped = async t =>
    {
        await asyncAction();
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}

public static IDisposable SubscribeAsync<T>(this IObservable<T> source, 
                         Func<T,Task> asyncAction, Action<Exception> handler = null)
{
    Func<T, Task<Unit>> wrapped = async t =>
    {
        await asyncAction(t);
        return Unit.Default;
    };
    if(handler == null)
        return source.SelectMany(wrapped).Subscribe(_ => { });
    else
        return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
like image 33
Benjol Avatar answered Oct 01 '22 10:10

Benjol