Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# producer/consumer / observer?

I have a producer / consumer queue, except that there are specific types of objects. So not just any consumer can consume an added object. I don't want to make a specific queue for each type, as there are too many. (It sort of stretches the definition of producer/consumer, but I'm not sure what the correct term is.)

Is there such a thing as an EventWaitHandle which allows pulses with a parameter? e.g. myHandle.Set(AddedType = "foo"). Right now I'm using Monitor.Wait and then each consumer checks to see if the pulse was actually intended for them, but that seems kind of pointless.

A pseduocode version of what I have now:

class MyWorker {
    public string MyType {get; set;}
    public static Dictionary<string, MyInfo> data;

    public static void DoWork(){
        while(true){
             if(Monitor.Wait(data, timeout)){
                   if (data.ContainsKey(MyType)){
                        // OK, do work
                   }
             }
        }
    }
}

As you can see, I might get pulses when other stuff is added to the dict. I only care when MyType is added to the dict. Is there a way to do that? It's not a huge deal, but, for example, I have to manually handle timeouts now, because each get of the lock could succeed within the timeout, but MyType is never added to the dict within timeout.

like image 979
Xodarap Avatar asked Dec 16 '10 15:12

Xodarap


2 Answers

This is an interesting question. It sounds like the key to the solution is a blocking variant of a priority queue. Java has the PriorityBlockingQueue, but unfortunately the equivalent for the .NET BCL is nonexistent. Once you have one, however, the implementation is easy.

class MyWorker 
{
    public string MyType {get; set;}
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork()
    {
        while(true)
        {
            MyInfo value;
            if (data.TryTake(MyType, timeout, out value))
            {
                // OK, do work
            }
        }
    }
}

Implementing a PriorityBlockingQueue is not terribly difficult. Following the same pattern as BlockingCollection by utilizing Add and Take style methods I came up with the following code.

public class PriorityBlockingQueue<TKey, TValue>
{
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>();

    public void Add(TKey key, TValue value)
    {
        lock (m_Dictionary)
        {
            m_Dictionary.Add(key, value);
            Monitor.Pulse(m_Dictionary);
        }
    }

    public TValue Take(TKey key)
    {
        TValue value;
        TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value);
        return value;
    }

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value)
    {
        value = default(TValue);
        DateTime initial = DateTime.UtcNow;
        lock (m_Dictionary)
        {
            while (!m_Dictionary.TryGetValue(key, out value))
            {
                if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important!
                TimeSpan span = timeout - (DateTime.UtcNow - initial);
                if (!Monitor.Wait(m_Dictionary, span))
                {
                    return false;
                }
            }
            m_Dictionary.Remove(key);
            return true;
        }
    }
}

This was a quick implementation and it has a couple of problems. First, I have not tested it at all. Second, it uses a red-black tree (via SortedDictionary) as the underlying data structure. That means the TryTake method will have O(log(n)) complexity. Priority queues typically have O(1) removal complexity. The typically data structure of choice for priority queues is a heap, but I find that skip lists are actually better in practice for several reasons. Neither of these exist in the .NET BCL which is why I used a SortedDictionary instead despite its inferior performance in this scenario.

I should point out here that this does not actually solve the pointless Wait/Pulse behavior. It is simply encapsulated in the PriorityBlockingQueue class. But, at the very least this will certainly cleanup the core part of your code.

It did not appear like your code handled multiple objects per key, but that would be easy to add by using a Queue<MyInfo> instead of a plain old MyInfo when adding to the dictionary.

like image 105
Brian Gideon Avatar answered Oct 05 '22 16:10

Brian Gideon


It seems like you want to combine producer/consumer queue with an Observer pattern - generic consumer thread or threads reads from the queue, and then passes the event to the required code. In this instance you would not actually signal the Observer but just call it when the consumer thread identifies who is interested in a given work item.

Observer pattern in .Net is typically implemented using C# events. You would just need to call the event handler for the object and one or more observers would get invoked through it. The target code would first have to register itself with the observed object by adding itself to the event for notification on arrival of work.

like image 23
Steve Townsend Avatar answered Oct 05 '22 15:10

Steve Townsend