Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel processing, but process objects with the same identifier sequentially [duplicate]

I've come across a problem that I can readily define but for the life of me can't seem to digest the MSDN for the best possible solution. It's been a while since I had to actually think about parallel processing outside of UI responsiveness.

So say, I have a Concurrent collection of Tasks that need to be processed. For example maybe it is loading data to various consumers by type (Consumer1, Consumer2, Consumer3...Consumer[N]) the underlying Task of sending the data is the same for each task but each consumer can only accept one source at a time

Basically, I want to process as much in parallel as possible with the caveat that I can only send 1 task to each consumer at a time. So if a current Job for a Consumer is already in progress then I should move to the next item in the collection and leave it for when the Job in progress for that consumer has completed. The Concurrent collection could also be added to at any time externally and if we had new types we'd need additional threads.

I guess what my question boils down to is how do I customize the "Take" from the collection so that I only grab the next Task with a property that designates it has a Consumer that doesn't already have a Job in progress.

Any ideas on what I'm missing here or if I'm even on the right path?

Example we have a Mediator Queue with Tasks associated with Banking transactions.

So we might add to our mediator queue (Let's say send SummaryData and Send TransactionData are using the same interface contract to send data)

  1. SendTransactionData -> Bank1
  2. SendTransactionData -> Bank2
  3. SendSummaryData -> Arbiter
  4. SendTransactionData -> Bank1
  5. SendTransactionData -> Bank3
  6. SendTransactionData -> Bank1
  7. SendTransactionData -> Bank2

1,2,3,5 can be processed in parallel but due to their own system(s) each consumer can only accept one input at a time transaction 4 must wait for transaction 1 to be completed and Transaction 6 must wait for transaction 4 to process. Similarly Transaction 7 must wait for transaction 2.

Before any of the initial processes have completed someone may add another grouping.

  1. SendSummaryData -> Arbiter

  2. SendTransactionData -> Bank1

  3. SendTransactionData -> Bank4

10 can be picked up immediately if a thread is available, but 8 and 9 must be queued behind their other related tasks.

Obviously there would be better ways to design a system to accomplish this but these are essentially the specs I'm looking to satisfy.

like image 635
PurpleMonkeyDiswasher Avatar asked May 01 '26 06:05

PurpleMonkeyDiswasher


1 Answers

Here is an approach based on the Parallel.ForEachAsync API, available from .NET 6 and later. The custom ForEachExclusivePerKeyAsync method below supports all the options and the functionality of the Parallel.ForEachAsync overload that has an IAsyncEnumerable<T> as source. Its behavior in case of errors or cancellation is identical. The only difference is that concurrent operations for elements with the same key are prevented. The key of each element is obtained via a keySelector function. The processing of items with the same key is serialized.

/// <summary>
/// Executes a for-each operation on an async-enumerable sequence in which
/// iterations may run concurrently, enforcing a non-concurrent execution policy
/// for elements having the same key.
/// </summary>
public static Task ForEachExclusivePerKeyAsync<TSource, TKey>(
    this IAsyncEnumerable<TSource> source,
    ParallelOptions parallelOptions,
    Func<TSource, CancellationToken, ValueTask> body,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default)
{
    ArgumentNullException.ThrowIfNull(body);
    ArgumentNullException.ThrowIfNull(keySelector);
    // The other arguments are validated by the Parallel.ForEachAsync itself.
    Dictionary<TKey, Queue<TSource>> perKey = new(keyComparer);
    return Parallel.ForEachAsync(source, parallelOptions, async (item, ct) =>
    {
        TKey key = keySelector(item);
        Queue<TSource> queue;
        lock (perKey)
        {
            // If there is no other task in-flight with the same key,
            // insert a null queue as an indicator of activity,
            // and start a processing loop for items with this key.
            // Otherwise enqueue this item and return.
            queue = CollectionsMarshal.GetValueRefOrAddDefault(
                perKey, key, out bool exists) ??= (exists ? new() : null);
            if (queue is not null)
            {
                queue.Enqueue(item); return;
            }
        }

        // Fire the task for this item, and for all other items with the
        // same key that might be enqueued while this task is in-flight.
        while (true)
        {
            ct.ThrowIfCancellationRequested();
            await body(item, ct); // Continue on captured context
            lock (perKey)
            {
                if (queue is null || queue.Count == 0)
                {
                    // Assume that meanwhile no other item was enqueued.
                    perKey.Remove(key, out queue);
                    if (queue is null || queue.Count == 0) return;
                    // The queue is actually not empty, so add it back.
                    perKey.Add(key, queue);
                }
                item = queue.Dequeue(); // Grab the next item.
            }
        }
    });
}

Usage example. A Channel<T> is used as the source/controller of the IAsyncEnumerable<T> sequence:

var channel = Channel.CreateUnbounded<Transaction>();
//...
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };
await ForEachExclusivePerKeyAsync(channel.Reader.ReadAllAsync(), options, async (x, _) =>
{
    await ProcessTransactionAsync(x);
}, keySelector: x => x.Bank);
//...
channel.Writer.TryWrite(new Transaction() { Bank = "Bank1" });
channel.Writer.TryWrite(new Transaction() { Bank = "Bank2" });

The ForEachExclusivePerKeyAsync implementation above uses the CollectionsMarshal.GetValueRefOrAddDefault method for updating the perKey dictionary, improving the performance at the cost of readability. For a less performant but more readable version you can look at the 4th revision of this answer.

For a version of the same method that does not depend on the Parallel.ForEachAsync API, and so it can run on .NET versions earlier than 6, you can look at the 3rd revision of this answer.

like image 52
Theodor Zoulias Avatar answered May 03 '26 19:05

Theodor Zoulias



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!