Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

.NET queued tasks (with async/await)

I have a large number of tasks (~1000) that need to be executed. I am running on a 4-core processor, so I'd like to process 4 tasks at a time, in parallel.

To give you a starting point, here is some sample code.

class Program
{
    public class LongOperation
    {
        private static readonly Random RandomNumberGenerator = new Random(0);
        const int UpdateFrequencyMilliseconds = 100;

        public int CurrentProgress { get; set; }

        public int TargetProcess { get; set; }

        public LongOperation()
        {
            TargetProcess = RandomNumberGenerator.Next(
                (int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds, 
                (int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
        }

        public async Task Execute()
        {
            while (!IsCompleted)
            {
                await Task.Delay(UpdateFrequencyMilliseconds);
                CurrentProgress++;
            }
        }

        public bool IsCompleted => CurrentProgress >= TargetProcess;
    }

    static void Main(string[] args)
    {
        Task.Factory.StartNew(async () =>
        {
            var operations = new List<LongOperation>();

            for(var x = 1; x <= 10; x++)
                operations.Add(new LongOperation());

            await ProcessOperations(4, operations);
        }).Wait();
    }

    public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
    {
        await Task.WhenAll(operations.Select(x => x.Execute()));
        // TODO: Process up to 4 operations at a time, until every operation is completed.
    }
}

I'd like some input on what classes I would use, and how I'd structure the ProcessOperations to process up to 4 operations at a time, until all operations are completed, in a single await-able Task.

I'm thinking of using a SemaphoreSlim object in some way, since it seems to be geared towards throttling a resource/process.

like image 817
Paul Knopf Avatar asked Oct 18 '22 21:10

Paul Knopf


1 Answers

As already been suggested, you need to use a handy TPL Dataflow library, with two blocks, for storing the messages before processing, and for actual action on them:

// storage
var operations = new BufferBlock<LongOperation>();
// no more than 4 actions at the time
var actions = new ActionBlock<LongOperation>(x => x.Execute(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

// consume new operations automatically
operations.LinkTo(actions);
for(var x = 1; x <= 10; ++x)
{
    // blocking sending
    operations.Post(new LongOperation());
    // awaitable send for async operations
    // await operations.SendAsync(new LongOperation());
}

Also you can intriduce some throttling limits, like no more than 30 operations at the time, by settings the BoundedCapacity option for your buffer.

like image 166
VMAtm Avatar answered Oct 21 '22 01:10

VMAtm