Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass an async method inside the Observable.Do extension method?

Given:

  1. IObservable<T> src
  2. async Task F(T){...}.
  3. F can only be called sequentially. So await F(x);await F(y); is fine, but Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...}); is wrong, because F(x) and F(y) must not run concurrently.

It is clear to me that await src.Do(F) is wrong, because it would run F concurrently.

My question is how to do it correctly?

like image 721
mark Avatar asked Mar 19 '23 15:03

mark


2 Answers

Observable.Do is for side effects only, not for sequential composition. SelectMany may be what you want. As of Rx 2.0, there are overloads of SelectMany that make it really easy to compose observables with Task<T>. (Just be aware of the additional concurrency that may be introduced by these and similar Task/Observable coop operators.)

var q = from value in src
        from _ in F(value.X).AsVoidAsync()  // See helper definition below
        from __ in F(value.Y).AsVoidAsync()
        select value;

However, based on the fact that you specifically asked about the Do operator, I suspect that src may contain more than one value and you don't want overlapping calls to F for each value either. In that case, consider that SelectMany is actually just like Select->Merge; therefore, what you probably want is Select->Concat.

// using System.Reactive.Threading.Tasks

var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();

Don't forget to use Defer since F(x) is hot.

AsVoidAsync Extension:

IObservable<T> requires a T, yet Task represents void, thus Rx's conversion operators require us to get a Task<T> from a Task. I tend to use Rx's System.Reactive.Unit struct for T:

public static class TaskExtensions
{
  public static Task<Unit> AsVoidAsync(this Task task)
  {
    return task.ContinueWith(t =>
    {
      var tcs = new TaskCompletionSource<Unit>();

      if (t.IsCanceled)
      {
        tcs.SetCanceled();
      }
      else if (t.IsFaulted)
      {
        tcs.SetException(t.Exception);
      }
      else
      {
        tcs.SetResult(Unit.Default);
      }

      return tcs.Task;
    },
    TaskContinuationOptions.ExecuteSynchronously)
    .Unwrap();
  }
}

Alternatively, you could always just call the specialized ToObservable method instead.

like image 73
Dave Sexton Avatar answered Apr 08 '23 12:04

Dave Sexton


The easiest way is to use TPL Dataflow, which will throttle to a single concurrent invocation by default, even for asynchronous methods:

var block = new ActionBlock<T>(F);
src.Subscribe(block.AsObserver());
await block.Completion;

Alternatively, you could subscribe an asynchronous method that you throttle yourself:

var semaphore = new SemaphoreSlim(1);
src.Do(async x =>
{
  await semaphore.WaitAsync();
  try
  {
    await F(x);
  }
  finally
  {
    semaphore.Release();
  }
});
like image 38
Stephen Cleary Avatar answered Apr 08 '23 12:04

Stephen Cleary