Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is it possible to await an Rx observable? [duplicate]

I have just noticed that the await keyword can be used with Rx Observable, for example:

await Observable.Interval(TimeSpan.FromHours(1));

I was pretty sure that it can only be used in conjunction with Tasks.

So what makes it possible? Is the knowledge of observables hard coded into the compiler?

like image 669
mark Avatar asked Dec 11 '14 03:12

mark


2 Answers

No, the compiler has no special knowledge of IObservable<T>. As per section 7.7.7.1 of the C# 5 specification if the object has a method or there is an extension method in scope named GetAwaiter that returns a type that implements System.Runtime.CompilerServices.INotifyCompletion, it can be awaited. See Steven Toub's article, Await anything.

More specifically, from the spec

The task of an await expression is required to be awaitable. An expression t is awaitable if one of the following holds:
- t is of compile time type dynamic
- t has an accessible instance or extension method called GetAwaiter with no parameters and no type parameters, and a return type A for which all of the following hold:
1. A implements the interface System.Runtime.CompilerServices.INotifyCompletion (hereafter known as INotifyCompletion for brevity)
2. A has an accessible, readable instance property IsCompleted of type bool
3. A has an accessible instance method GetResult with no parameters and no type parameters

Note how this is similar to how foreach does not require IEnumerable<T> but simply a GetEnumerator method that returns a compatible object. This sort of duck typing is a performance optimization that allows value types to be used by the compiler without boxing. This can be used to avoid unnecessary allocations in performance sensitive code.

like image 76
Mike Zboray Avatar answered Oct 04 '22 20:10

Mike Zboray


I think it is because System.Reactive.Linq defines a GetAwaiter extension method on IObservable. As @mike z explained that allows you to await an IObservable. Here is the method:

public static AsyncSubject<TSource> GetAwaiter<TSource>(this IObservable<TSource> source)
{
    if (source == null)
    {
        throw new ArgumentNullException("source");
    }
    return s_impl.GetAwaiter<TSource>(source);
}

The returned type AsyncSubject<T> implements INotifyCompletion and has the IsCompleted property and GetResult method.

public sealed class AsyncSubject<T> : ISubject<T>, ISubject<T, T>, IObserver<T>, IObservable<T>, IDisposable, INotifyCompletion
{
    // Fields
    private Exception _exception;
    private readonly object _gate;
    private bool _hasValue;
    private bool _isDisposed;
    private bool _isStopped;
    private ImmutableList<IObserver<T>> _observers;
    private T _value;

    // Methods
    public AsyncSubject();
    private void CheckDisposed();
    public void Dispose();
    public AsyncSubject<T> GetAwaiter();
    public T GetResult();
    public void OnCompleted();
    public void OnCompleted(Action continuation);
    private void OnCompleted(Action continuation, bool originalContext);
    public void OnError(Exception error);
    public void OnNext(T value);
    public IDisposable Subscribe(IObserver<T> observer);

    // Properties
    public bool HasObservers { get; }
    public bool IsCompleted { get; }
like image 41
NeddySpaghetti Avatar answered Oct 04 '22 20:10

NeddySpaghetti