Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to constraint concurrency the right way in Rx.NET

Please, observe the following code snippet:

var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();

The problem with this code is that getResultAsync runs concurrently in an unconstrained fashion. Which could be not what we want in certain cases. Suppose I want to restrict its concurrency to at most 10 concurrent invocations. What is the Rx.NET way to do it?

I am enclosing a simple console application that demonstrates the subject and my lame solution of the described problem.

There is a bit extra code, like the Stats class and the artificial random sleeps. They are there to ensure I truly get concurrent execution and can reliably compute the max concurrency reached during the process.

The method RunUnconstrained demonstrates the naive, unconstrained run. The method RunConstrained shows my solution, which is not very elegant. Ideally, I would like to ease constraining the concurrency by simply applying a dedicated Rx operator to the Monad. Of course, without sacrificing the performance.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace RxConstrainedConcurrency
{
    class Program
    {
        public class Stats
        {
            public int MaxConcurrentCount;
            public int CurConcurrentCount;
            public readonly object MaxConcurrentCountGuard = new object();
        }

        static void Main()
        {
            RunUnconstrained().GetAwaiter().GetResult();
            RunConstrained().GetAwaiter().GetResult();
        }
        static async Task RunUnconstrained()
        {
            await Run(AsyncOp);
        }
        static async Task RunConstrained()
        {
            using (var sem = new SemaphoreSlim(10))
            {
                await Run(async (s, pause, stats) =>
                {
                    // ReSharper disable AccessToDisposedClosure
                    await sem.WaitAsync();
                    try
                    {
                        return await AsyncOp(s, pause, stats);
                    }
                    finally
                    {
                        sem.Release();
                    }
                    // ReSharper restore AccessToDisposedClosure
                });
            }
        }
        static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
        {
            var stats = new Stats();
            var rnd = new Random(0x1234);
            var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
            Debug.Assert(stats.CurConcurrentCount == 0);
            Debug.Assert(result.Count == 1000);
            Debug.Assert(!result.Contains(0));
            Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
        }

        static IObservable<string> GetSource(int count)
        {
            return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
        }

        static Task<int> AsyncOp(string s, int pause, Stats stats)
        {
            return Task.Run(() =>
            {
                int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
                if (stats.MaxConcurrentCount < cur)
                {
                    lock (stats.MaxConcurrentCountGuard)
                    {
                        if (stats.MaxConcurrentCount < cur)
                        {
                            stats.MaxConcurrentCount = cur;
                        }
                    }
                }

                try
                {
                    Thread.Sleep(pause);
                    return int.Parse(s);
                }
                finally
                {
                    Interlocked.Decrement(ref stats.CurConcurrentCount);
                }
            });
        }
    }
}
like image 805
mark Avatar asked Mar 25 '15 18:03

mark


1 Answers

You can do this in Rx using the overload of Merge that constrains the number of concurrent subscriptions to inner observables.

This form of Merge is applied to a stream of streams.

Ordinarily, using SelectMany to invoke an async task from an event does two jobs: it projects each event into an observable stream whose single event is the result, and it flattens all the resulting streams together.

To use Merge we must use a regular Select to project each event into the invocation of an async task, (thus creating a stream of streams), and use Merge to flatten the result. It will do this in a constrained way by only subscribing to a supplied fixed number of the inner streams at any point in time.

We must be careful to only invoke each asynchronous task invocation upon subscription to the wrapping inner stream. Conversion of an async task to an observable with ToObservable() will actually call the async task immediately, rather than on subscription, so we must defer the evaluation until subscription using Observable.Defer.

Here's an example putting all these steps together:

void Main()
{
    var xs = Observable.Range(0, 10); // source events

    // "Double" here is our async operation to be constrained,
    // in this case to 3 concurrent invocations

    xs.Select(x =>
       Observable.Defer(() => Double(x).ToObservable())).Merge(3)
      .Subscribe(Console.WriteLine,
                 () => Console.WriteLine("Max: " + MaxConcurrent));


}

private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();

public async Task<int> Double(int x)
{
    var concurrent = Interlocked.Increment(ref Concurrent);
    lock(gate)
    {
        MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
    }

    await Task.Delay(TimeSpan.FromSeconds(1));

    Interlocked.Decrement(ref Concurrent);

    return x * 2;
}

The maximum concurrency output here will be "3". Remove the Merge to go "unconstrained" and you'll get "10" instead.

Another (equivalent) way of getting the Defer effect that reads a bit nicer is to use FromAsync instead of Defer + ToObservable:

xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)
like image 199
James World Avatar answered Oct 26 '22 18:10

James World