Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IObservable<T>.ToTask<T> method returns Task awaiting activation

Why does task await forever?:

var task = Observable
    .FromEventPattern<MessageResponseEventArgs>(communicator, "PushMessageRecieved")
    .Where(i => i.EventArgs.GetRequestFromReceivedMessage().Name == requestName)
    .Select(i => i.EventArgs)
    .RunAsync(System.Threading.CancellationToken.None)
    .ToTask();

task.Wait();

I know "PushMessageRecieved" is fired; I can set a break point on the Select lambda and hit it. But task.Wait() never moves.

Better Update: FirstAsync() is what I am looking for:

    public static Task<MessageResponseEventArgs> HandlePushMessageRecievedAsync(this ICommunicator communicator, RequestName requestName)
    {
        if (communicator == null) return Task.FromResult<MessageResponseEventArgs>(null);

        var observable = GetCommunicatorObservableForPushMessageReceived(communicator);
        return observable
            .Where(i => i.GetRequestFromReceivedMessage().Name == requestName)
            .Select(i => i)
            .FirstAsync()
            .ToTask();
    }

where GetCommunicatorObservableForPushMessageReceived() is:

    static IObservable<MessageResponseEventArgs> GetCommunicatorObservableForPushMessageReceived(ICommunicator communicator)
    {
        if (communicatorObservableForPushMessageReceived == null)
        {
            communicatorObservableForPushMessageReceived = Observable
                .FromEventPattern<MessageResponseEventArgs>(communicator, "PushMessageRecieved")
                .Where(i => !IsPreviousMessage(i.EventArgs.GetRequestFromReceivedMessage().EventId))
                .Select(i => i.EventArgs);
        }

        return communicatorObservableForPushMessageReceived;
    }

Update: what is somewhat horrific is this (but it works):

public static Task<MessageResponseEventArgs> HandlePushMessageRecievedAsync(this ICommunicator communicator, RequestName requestName)
{
    if (communicator == null) return Task.FromResult<MessageResponseEventArgs>(null);

    var completionSource = new TaskCompletionSource<MessageResponseEventArgs>();

    Observable
        .FromEventPattern<MessageResponseEventArgs>(communicator, "PushMessageRecieved")
        .Where(i => i.EventArgs.GetRequestFromReceivedMessage().Name == requestName)
        .Select(i => i.EventArgs)
        .ToEvent().OnNext += (args) =>
        {
            if (args.Response.Errors != null && args.Response.Errors.Any())
            {
                completionSource.TrySetException(args.Response.Errors.Select(j => new Exception(j.ErrorMessage)));
            }
            else
            {
                completionSource.TrySetResult(args);
            }
        };

    return completionSource.Task;
}
like image 274
rasx Avatar asked Apr 23 '14 20:04

rasx


2 Answers

Both RunAsync and ToTask yield the very last value in the observable. Thus no value will be produced until the observable completes. But observables created with FromEventPattern do not usually complete. You need to force them to complete with something like Take or Until.

I'll also note that RunAsync and ToTask are essentially redundant and there isn't need to do both.

In your case, I'll assume you are really interested in the first value that makes it through your filter:

var task = Observable
    .FromEventPattern<MessageResponseEventArgs>(communicator, "PushMessageRecieved")
    .FirstAsync(i => i.EventArgs.GetRequestFromReceivedMessage().Name == requestName)
    .Select(i => i.EventArgs)
    .ToTask();

task.Wait();
like image 84
Brandon Avatar answered Oct 08 '22 22:10

Brandon


The handlers for the PushMessageRecieved event that you are observing need to be run on the UI thread, in the current synchronization context. You're blocking the UI thread (which represents the current context) while you wait for this task. The task can't finish because you're waiting on it, you'll never finish waiting on it because it can't run. Deadlock.

You shouldn't be synchronously blocking on the task but rather asynchronously executing code as a continuation of that task.

like image 36
Servy Avatar answered Oct 08 '22 22:10

Servy