Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Prioritized queues in Task Parallel Library

Is there any prior work of adding tasks to the TPL runtime with a varying priority?

If not, generally speaking, how would I implement this?

Ideally I plan on using the producer-consumer pattern to add "todo" work to the TPL. There may be times where I discover that a low priority job needs to be upgraded to a high priority job (relative to the others).

If anyone has some search keywords I should use when searching for this, please mention them, since I haven't yet found code that will do what I need.

like image 953
makerofthings7 Avatar asked Feb 18 '13 16:02

makerofthings7


2 Answers

So here is a rather naive concurrent implementation around a rather naive priority queue. The idea here is that there is a sorted set that holds onto pairs of both the real item and a priority, but is given a comparer that just compares the priority. The constructor takes a function that computes the priority for a given object.

As for actual implementation, they're not efficiently implemented, I just lock around everything. Creating more efficient implementations would prevent the use of SortedSet as a priority queue, and re-implementing one of those that can be effectively accessed concurrently is not going to be that easy.

In order to change the priority of an item you'll need to remove the item from the set and then add it again, and to find it without iterating the whole set you'd need to know the old priority as well as the new priority.

public class ConcurrentPriorityQueue<T> : IProducerConsumerCollection<T>
{
    private object key = new object();
    private SortedSet<Tuple<T, int>> set;

    private Func<T, int> prioritySelector;

    public ConcurrentPriorityQueue(Func<T, int> prioritySelector, IComparer<T> comparer = null)
    {
        this.prioritySelector = prioritySelector;
        set = new SortedSet<Tuple<T, int>>(
            new MyComparer<T>(comparer ?? Comparer<T>.Default));
    }

    private class MyComparer<T> : IComparer<Tuple<T, int>>
    {
        private IComparer<T> comparer;
        public MyComparer(IComparer<T> comparer)
        {
            this.comparer = comparer;
        }
        public int Compare(Tuple<T, int> first, Tuple<T, int> second)
        {
            var returnValue = first.Item2.CompareTo(second.Item2);
            if (returnValue == 0)
                returnValue = comparer.Compare(first.Item1, second.Item1);
            return returnValue;
        }
    }

    public bool TryAdd(T item)
    {
        lock (key)
        {
            return set.Add(Tuple.Create(item, prioritySelector(item)));
        }
    }

    public bool TryTake(out T item)
    {
        lock (key)
        {
            if (set.Count > 0)
            {
                var first = set.First();
                item = first.Item1;
                return set.Remove(first);
            }
            else
            {
                item = default(T);
                return false;
            }
        }
    }

    public bool ChangePriority(T item, int oldPriority, int newPriority)
    {
        lock (key)
        {
            if (set.Remove(Tuple.Create(item, oldPriority)))
            {
                return set.Add(Tuple.Create(item, newPriority));
            }
            else
                return false;
        }
    }

    public bool ChangePriority(T item)
    {
        lock (key)
        {
            var result = set.FirstOrDefault(pair => object.Equals(pair.Item1, item));

            if (object.Equals(result.Item1, item))
            {
                return ChangePriority(item, result.Item2, prioritySelector(item));
            }
            else
            {
                return false;
            }
        }
    }

    public void CopyTo(T[] array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array[index++] = item;
            }
        }
    }

    public T[] ToArray()
    {
        lock (key)
        {
            return set.Select(pair => pair.Item1).ToArray();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return ToArray().AsEnumerable().GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void CopyTo(Array array, int index)
    {
        lock (key)
        {
            foreach (var item in set.Select(pair => pair.Item1))
            {
                array.SetValue(item, index++);
            }
        }
    }

    public int Count
    {
        get { lock (key) { return set.Count; } }
    }

    public bool IsSynchronized
    {
        get { return true; }
    }

    public object SyncRoot
    {
        get { return key; }
    }
}

Once you have an IProducerConsumerCollection<T> instance, which the above object is, you can use it as the internal backing object of a BlockingCollection<T> in order to have an easier to use user interface.

like image 115
Servy Avatar answered Nov 14 '22 11:11

Servy


ParallelExtensionsExtras contains several custom TaskSchedulers that could be helpful either directly or as a base for your own scheduler.

Specifically, there are two schedulers that may be interesting for you:

  • QueuedTaskScheduler, which allows you to schedule Tasks at different priorities, but doesn't allow changing the priority of enqueued Tasks.
  • ReprioritizableTaskScheduler, which doesn't have different priorities, but allows you to move a specific Task to the front or to the back of the queue. (Though changing priority is O(n) in the number of currently waiting Tasks, which could be a problem if you had many Tasks at the same time.)
like image 33
svick Avatar answered Nov 14 '22 11:11

svick