Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Distribute ConcurrentQueue<> amongst workers equally

Consider the following setup:

public class WorkItem
{
    public string Name { get; set; }
}

public class Worker
{
    public async Task DoWork(WorkItem workItem)
    {
        await Task.Delay(1000); //run task
    }
}


public class Engine
{
    private ConcurrentQueue<WorkItem> _workItems = new();
    private List<Worker> _workers = new();

    public Engine(int workers, int threads)
    {
        ConcurrentThreadsForEachWorker = threads;
        _workers = new();
        for (int i = 0; i < workers; i++)
        {
            _workers.Add(new());
        }
    }

    public int ConcurrentThreadsForEachWorker { get; private set; }    

    public async Task RunAsync(CancellationToken token)
    {
        while(!token.IsCancellationRequested)
        {
            //distribute work amongst workers here
        }
    }
}

Let's say on the constructor for the Engine class, I get 2 for workers and 4 for threads. I want to implement the RunAsync method so that the workers have equal load as follows:

WorkItem 1 -> Worker 1 (running 1)
WorkItem 2 -> Worker 2 (running 1)
WorkItem 3 -> Worker 1 (running 2)
WorkItem 4 -> Worker 2 (running 2)
WorkItem 5 -> Worker 1 (running 3)
WorkItem 6 -> Worker 2 (running 3)
WorkItem 7 -> Worker 1 (running 4 - full)
WorkItem 8 -> Worker 2 (running 4 - full)
WorkItem 9 -> Both workers are full, so wait until one of them is free

More Info

Maybe a bit of context would help. I'm trying to process a lot of inboxes (around 1000). The "Worker" is a Microsoft Graph Client and the "WorkItem" is an inbox. Each Graph Client can run 4 queries at a given time. So I want to register a couple of App IDs in the Graph Client and let each of them handle 4 of these inboxes. I basically want to process them as fast as possible.

Microsoft Graph has a subscription feature that when there's a new email, it can notify the program. So when that notification arrives, I want to add it to the queue and if the queue has enough room, it should immediately process it.

I'd also love it if there was a way that we could add an item to the front of the queue (instead of the back). So if there's an urgent change or a high-priority inbox, the clients could process that inbox ASAP.

like image 867
Alireza Noori Avatar asked Oct 22 '25 07:10

Alireza Noori


1 Answers

A simple solution might be to store the workers in a Queue<T> instead of a List<T>, dequeue a worker every time you need one, and enqueue it back immediately:

Queue<Worker> _workers = new();
for (int i = 0; i < workersCount; i++) _workers.Enqueue(new());

ParallelOptions options = new() { MaxDegreeOfParallelism = 10 };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
    Worker worker;
    lock (_workers)
    {
        worker = _workers.Dequeue();
        _workers.Enqueue(worker);
    }
    await worker.DoWork(workItem);
});

This way the workers will be used in a round robin fashion, as an unlimited resource. The MaxConcurrencyPerWorker policy will not be enforced.

If you want to enforce this policy, then you must use them as a limited resource, so enqueue them back in the queue only after the completion of the DoWork operation. You must also enqueue each Worker multiple times in the queue (MaxConcurrencyPerWorker times), in an interleaving manner. You must also deal with the case that the pool of workers has been exhausted, in which case the execution flow will have to be suspended until a worker becomes available. A Queue<T> doesn't offer this functionality. You will need a Channel<T>:

Channel<Worker> workerPool = Channel.CreateUnbounded<Worker>();
for (int i = 0; i < MaxConcurrencyPerWorker; i++)
    foreach (Worker worker in _workers)
        workerPool.Writer.TryWrite(worker);

ParallelOptions options = new() { MaxDegreeOfParallelism = workerPool.Reader.Count };
await Parallel.ForEachAsync(workItems, options, async (workItem, ct) =>
{
    Worker worker = await workerPool.Reader.ReadAsync();
    try
    {
        await worker.DoWork(workItem);
    }
    finally { workerPool.Writer.TryWrite(worker); }
});

The Channel<T> is an asynchronous version of the BlockingCollection<T>. The ChannelReader.ReadAsync method returns a worker synchronously if one is stored currently in the channel, or asynchronously if the channel is currently empty. In the above example the ReadAsync will always return synchronously a worker, because the degree of parallelism of the Parallel.ForEachAsync loop has been limited to the number of the total (not distinct) available workers.


Update: The above solution does not guarantee perfect balancing in the long-run. It's not impossible for the workerPool to lose gradually its "interleaving" property, resulting in many references of the same Worker being stored consecutively the one after the other. For precise control it might be required to keep track of the usage statistics of each individual worker. You would need some structure resembling a LRU cache to hold the workers and the statistics. Something like an ObjectPool<T> with priority management. Here is what I came up with. A PriorityPool<T> class that is backed up by a simple array (instead of something more complex like a dictionary or a sorted set or a priority queue), which is also equipped with a SemaphoreSlim in order to enforce the MaxConcurrencyPerWorker policy.

public class PriorityPool<T> : IDisposable
{
    private struct Entry
    {
        public T Item;
        public int ConcurrencyCount;
        public long LastUseStamp;
    }

    private readonly Entry[] _items;
    private readonly IEqualityComparer<T> _comparer;
    private readonly SemaphoreSlim _semaphore;
    private long _lastUseStamp;

    public int Count { get { return _items.Length; } }

    public PriorityPool(IEnumerable<T> items, int maxConcurrencyPerItem,
        IEqualityComparer<T> comparer = default)
    {
        ArgumentNullException.ThrowIfNull(items);
        if (maxConcurrencyPerItem < 1)
            throw new ArgumentOutOfRangeException(nameof(maxConcurrencyPerItem));
        _items = items.Select(x => new Entry() { Item = x }).ToArray();
        _comparer = comparer ?? EqualityComparer<T>.Default;
        if (_items.Length == 0)
            throw new ArgumentException("No items found.", nameof(items));
        if (_items.DistinctBy(e => e.Item, _comparer).Count() != _items.Length)
            throw new ArgumentException("Duplicate item found.", nameof(items));
        int semaphoreSize = _items.Length * maxConcurrencyPerItem;
        _semaphore = new(semaphoreSize, semaphoreSize);
    }

    public async ValueTask<T> GetAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        lock (_items)
        {
            int index = 0;
            for (int i = 1; i < _items.Length; i++)
            {
                int diff = _items[i].ConcurrencyCount - _items[index].ConcurrencyCount;
                if (diff > 0) continue;
                if (diff < 0 || _items[i].LastUseStamp < _items[index].LastUseStamp)
                    index = i;
            }
            _items[index].ConcurrencyCount++;
            _items[index].LastUseStamp = ++_lastUseStamp;
            return _items[index].Item;
        }
    }

    public void Return(T item)
    {
        lock (_items)
        {
            int index;
            for (index = 0; index < _items.Length; index++)
                if (_comparer.Equals(item, _items[index].Item)) break;
            if (index == _items.Length)
                throw new InvalidOperationException("Item not found.");
            if (_items[index].ConcurrencyCount == 0)
                throw new InvalidOperationException("Negative concurrency.");
            _items[index].ConcurrencyCount--;
        }
        _semaphore.Release();
    }

    public void Dispose() => _semaphore.Dispose();
}

Usage example:

using PriorityPool<Worker> workerPool = new(_workers, MaxConcurrencyPerWorker);

//...

Worker worker = await workerPool.GetAsync();
try
{
    await worker.DoWork(workItem);
}
finally { workerPool.Return(worker); }

The GetAsync method returns the worker with the least concurrency level at the moment. In case of a tie, it returns the least recently used worker.

The PriorityPool<T> class is thread-safe, with the exception of Dispose, which must be used only when all other operations on the PriorityPool<T> have completed (behavior inherited from the SemaphoreSlim).

like image 137
Theodor Zoulias Avatar answered Oct 23 '25 20:10

Theodor Zoulias