I have a large number of tasks (~1000) that need to be executed. I am running on a 4-core processor, so I'd like to process 4 tasks at a time, in parallel.
To give you a starting point, here is some sample code.
class Program
{
public class LongOperation
{
private static readonly Random RandomNumberGenerator = new Random(0);
const int UpdateFrequencyMilliseconds = 100;
public int CurrentProgress { get; set; }
public int TargetProcess { get; set; }
public LongOperation()
{
TargetProcess = RandomNumberGenerator.Next(
(int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds,
(int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
}
public async Task Execute()
{
while (!IsCompleted)
{
await Task.Delay(UpdateFrequencyMilliseconds);
CurrentProgress++;
}
}
public bool IsCompleted => CurrentProgress >= TargetProcess;
}
static void Main(string[] args)
{
Task.Factory.StartNew(async () =>
{
var operations = new List<LongOperation>();
for(var x = 1; x <= 10; x++)
operations.Add(new LongOperation());
await ProcessOperations(4, operations);
}).Wait();
}
public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
{
await Task.WhenAll(operations.Select(x => x.Execute()));
// TODO: Process up to 4 operations at a time, until every operation is completed.
}
}
I'd like some input on what classes I would use, and how I'd structure the ProcessOperations
to process up to 4 operations at a time, until all operations are completed, in a single await-able Task
.
I'm thinking of using a SemaphoreSlim
object in some way, since it seems to be geared towards throttling a resource/process.
As already been suggested, you need to use a handy TPL Dataflow library, with two blocks, for storing the messages before processing, and for actual action on them:
// storage
var operations = new BufferBlock<LongOperation>();
// no more than 4 actions at the time
var actions = new ActionBlock<LongOperation>(x => x.Execute(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
// consume new operations automatically
operations.LinkTo(actions);
for(var x = 1; x <= 10; ++x)
{
// blocking sending
operations.Post(new LongOperation());
// awaitable send for async operations
// await operations.SendAsync(new LongOperation());
}
Also you can intriduce some throttling limits, like no more than 30 operations at the time, by settings the BoundedCapacity
option for your buffer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With