Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Async API call inside an actor and exceptions

Tags:

c#

akka.net

I know about PipeTo, but some stuff, like synchronous waiting on nested continuation, seems to go against the async & await way.

So, my first question [1] would be: is there any 'magic' here, so that we can just synchronously wait for nested tasks in a continuation and it's still async in the end?

While we're at async & await differences, how are failures handled?

Let's create a simple example:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }

    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}

In a 'regular', async & await way:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);

the exception will bubble up from the first operation, just as you'd expect.

Now, let's create an actor that's going to work with those async operations. For the sake of an argument, let's say that CalculateAnswerAsync and ConvertAsync should be used one after another as one, full operation (similar to, for example, StreamWriter.WriteLineAsync and StreamWriter.FlushAsync if you just want to write one line to a stream).

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }

    public sealed class OperationResult
    {
        private readonly string message;

        public OperationResult(string message)
        {
            this.message = message;
        }

        public string Message
        {
            get { return message; }
        }
    }

    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}

If there are no exceptions, I still get Got 42 :) without any issues, which brings me back to 'magic' point above [1]. Also, are the AttachedToParent and ExecuteSynchronously flags provided in an example optional, or are they pretty much required to have everything working as intended? They don't seem to have any effect on exception handling...

Now, if the CalculateAnswerAsync throws an exception, which means that result.Result throws AggregateException, it's pretty much swallowed without a trace.

What should I do here, if it's even possible, to make the exception inside an asynchronous operation crash the actor as a 'regular' exception would?

like image 877
Patryk Ćwiek Avatar asked Feb 16 '15 21:02

Patryk Ćwiek


People also ask

What happens if an exception is thrown within an asynchronous method?

Async void methods have different error-handling semantics. When an exception is thrown out of an async Task or async Task method, that exception is captured and placed on the Task object.

Why you shouldn't use async void?

Async void methods can wreak havoc if the caller isn't expecting them to be async. When the return type is Task, the caller knows it's dealing with a future operation; when the return type is void, the caller might assume the method is complete by the time it returns.

Can you call an async method without await?

However, just to address "Call an async method in C# without await", you can execute the async method inside a Task. Run . This approach will wait until MyAsyncMethod finish. await asynchronously unwraps the Result of your task, whereas just using Result would block until the task had completed.


2 Answers

The joys of error-handling in the TPL :)

Once a Task starts running on its own thread, everything that happens inside it is already asynchronous from the caller - including error-handling

  1. When you kick off your first Task inside of an actor, that task runs independently on the ThreadPool from your actor. This means that anything you do inside that Task will already be asynchronous from your actor - because it's running on a different thread. This is why I made a Task.Wait call inside the PipeTo sample you linked to at the top of your post. Makes no difference to the actor - it just looks like a long-running task.
  2. Exceptions - if your inner task failed, the conversionTask.Result property will throw the exception captured during its run, so you'll want to add some error-handling inside your Task to ensure that your actor gets notified that something went wrong. Notice I did just that here: https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - if you turn your Exceptions into messages your actor can handle: birds start singing, rainbows shine, and TPL errors stop being a source of pain and agony.
  3. As for what happens when an exception gets thrown...

Now, if the CalculateAnswerAsync throws an exception, which means that result.Result throws AggregateException, it's pretty much swallowed without a trace.

The AggregateException will contain the list of inner exceptions wrapped inside of it - the reason the TPL has this concept of aggregate errors is in the event that (a) you have one task that is the continuation of multiple tasks in aggregate, i.e. Task.WhenAll or (b) you have errors propagated up the ContinueWith chain back to the parent. You can also call the AggregateException.Flatten() call to make it a little easier to manage nested exceptions.

Best Practices for TPL + Akka.NET

Dealing with Exceptions from the TPL is a nuisance, that's true - but the best way to deal with it is to try..catch.. exceptions inside your Task and turn them into message classes your actor can handle.

Also, are the AttachedToParent and ExecuteSynchronously flags provided in an example optional, or are they pretty much required to have everything working as intended?

This is mostly an issue for when you have continuations on continuations - PipeTo automatically uses these flags on itself. It has zero impact on error handling, but ensures that your continuations are executed immediately on the same thread as the original Task.

I recommend using these flags only when you're doing a lot of nested continuations - the TPL starts to take some liberties with how it schedules your tasks once you go deeper than 1 continuation (and in fact, flags like OnlyOnCompleted stop being accepted after more than 1 continuation.)

like image 89
Aaronontheweb Avatar answered Oct 01 '22 02:10

Aaronontheweb


Just to add to what Aaron said. As of yesterday, we do support safe async await inside actors when using the Task dispatcher.

public class AsyncAwaitActor : ReceiveActor
{
    public AsyncAwaitActor()
    {
        Receive<string>(async m =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            Sender.Tell("done");
        });
    }
}

public class AskerActor : ReceiveActor
{
    public AskerActor(ActorRef other)
    {
        Receive<string>(async m =>
        {
            var res = await other.Ask(m);
            Sender.Tell(res);
        });
    }
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
    [Fact]
    public async Task Actors_should_be_able_to_async_await_ask_message_loop()
    {
        var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Worker");
        //IMPORTANT: you must use the akka.actor.task-dispatcher
        //otherwise async await is not safe

        var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Asker");

        var res = await asker.Ask("something");
        Assert.Equal("done", res);
    }
}

This is not our default dispatcher since it does come with a price in performance/throughput. There is also a risk of deadlocks if you trigger tasks that block(e.g. using task.Wait() or task.Result) So the PipeTo pattern is still the preferred approach since it is more true to the actor model. But the async await support is there for you as an extra tool if you really need to do some TPL integration.

This feature actually uses PipeTo under the covers. It will take every task continuation and wrap that up in a special message and pass that message back to the actor and execute that task inside the actors own concurrency context.

like image 41
Roger Johansson Avatar answered Oct 01 '22 03:10

Roger Johansson