Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

await async lambda in ActionBlock

I have a class Receiver with an ActionBlock:

public class Receiver<T> : IReceiver<T>
{

  private ActionBlock<T> _receiver;

  public Task<bool> Send(T item) 
  {
     if(_receiver!=null)
        return _receiver.SendAsync(item);

     //Do some other stuff her
  }

  public void Register (Func<T, Task> receiver)
  {
    _receiver = new ActionBlock<T> (receiver);
  }

  //...
}

The Register-Action for the ActionBlock is a async-Method with a await-Statement:

private static async Task Writer(int num)
{
   Console.WriteLine("start " + num);
   await Task.Delay(500);
   Console.WriteLine("end " + num);
}

Now what i want to do is to wait synchronously (if a condition is set) until the action method is finished to get an exclusive behavior:

var receiver = new Receiver<int>();
receiver.Register((Func<int, Task) Writer);
receiver.Send(5).Wait(); //does not wait the action-await here!

The Problem is when the "await Task.Delay(500);" statement is executed, the "receiver.Post(5).Wait();" does not wait anymore.

I have tried several variants (TaskCompletionSource, ContinueWith, ...) but it does not work.

Has anyone an idea how to solve the problem?

like image 262
obi111 Avatar asked Dec 04 '12 13:12

obi111


1 Answers

ActionBlock by default will enforce exclusive behavior (only one item is processed at a time). If you mean something else by "exclusive behavior", you can use TaskCompletionSource to notify your sender when the action is complete:

... use ActionBlock<Tuple<int, TaskCompletionSource<object>>> and Receiver<Tuple<int, TaskCompletionSource<object>>>
var receiver = new Receiver<Tuple<int, TaskCompletionSource<object>>>();
receiver.Register((Func<Tuple<int, TaskCompletionSource<object>>, Task) Writer);
var tcs = new TaskCompletionSource<object>();
receiver.Send(Tuple.Create(5, tcs));
tcs.Task.Wait(); // if you must

private static async Task Writer(int num, TaskCompletionSource<object> tcs)
{
  Console.WriteLine("start " + num);
  await Task.Delay(500);
  Console.WriteLine("end " + num);
  tcs.SetResult(null);
}

Alternatively, you could use AsyncLock (included in my AsyncEx library):

private static AsyncLock mutex = new AsyncLock();

private static async Task Writer(int num)
{
  using (await mutex.LockAsync())
  {
    Console.WriteLine("start " + num);
    await Task.Delay(500);
    Console.WriteLine("end " + num);
  }
}
like image 180
Stephen Cleary Avatar answered Sep 18 '22 19:09

Stephen Cleary