I have following code:
IObservable<Data> _source;
...
_source.Subscribe(StoreToDatabase);
private async Task StoreToDatabase(Data data) {
await dbstuff(data);
}
However, this does not compile. Is there any way how to observe data asynchronously? I tried async void
, it works, but I feel that given solution is not feasible.
I also checked Reactive Extensions Subscribe calling await, but it does not answer my question (I do not care about the SelectMany
result.)
An observable produces values over time. An array is created as a static set of values. In a sense, observables are asynchronous where arrays are synchronous. In the following examples, → implies asynchronous value delivery.
With asyn-await pattern, we do not need neither “subscribe” nor “toPromise”. Code looks very simple and obvious.
An async method runs synchronously until it reaches its first await expression, at which point the method is suspended until the awaited task is complete. In the meantime, control returns to the caller of the method, as the example in the next section shows.
The call to the async method starts an asynchronous task. However, because no Await operator is applied, the program continues without waiting for the task to complete. In most cases, that behavior isn't expected.
You don't have to care about the SelectMany
result. The answer is still the same... though you need your task to have a return type (i.e. Task<T>
, not Task
).
Unit
is essentially equivalent to void
, so you can use that:
_source.SelectMany(StoreToDatabase).Subscribe();
private async Task<Unit> StoreToDatabase(Data data)
{
await dbstuff(data);
return Unit.Default;
}
This SelectMany
overload accepts a Func<TSource, Task<TResult>
meaning the resulting sequence will not complete until the task is completed.
Late answer, but I think that the following extension methods correctly encapsulate what Charles Mager proposed in his answer:
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<Task> asyncAction, Action<Exception> handler = null)
{
Func<T,Task<Unit>> wrapped = async t =>
{
await asyncAction();
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
public static IDisposable SubscribeAsync<T>(this IObservable<T> source,
Func<T,Task> asyncAction, Action<Exception> handler = null)
{
Func<T, Task<Unit>> wrapped = async t =>
{
await asyncAction(t);
return Unit.Default;
};
if(handler == null)
return source.SelectMany(wrapped).Subscribe(_ => { });
else
return source.SelectMany(wrapped).Subscribe(_ => { }, handler);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With