Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple threads accesing IEnumerable using yield

I am using a third party library that iterates over some very large flat files which can take many minutes. The library provides an Enumerator so you can yield each result and process it while the enumerator then extracts the next item in the flat file.

eg:

IEnumerable<object> GetItems()
{
    var cursor = new Cursor;

    try
    {
        cursor.Open();

        while (!cursor.EOF)
        {
            yield return new //object;

            cursor.MoveNext();
        }

    }
    finally
    {
        if (cursor.IsOpen)
        {
            cursor.Close();
        }
    }
}

What I am trying to achieve is to have two consumers of the same Enumerable so I don't have to extract the information twice and so each consumer can still process each item as it arrives with out having to wait for all the times to arrive at once.

IEnumerable<object> items = GetItems();

new Thread(SaveToDateBase(items)).Start();
new Thread(SaveSomewhereElse(items)).Start();

I guess what I am trying to achieve is something like

"if the item the consumer is asking for is already extracted then yield it, otherwise move next and wait" but I am conscious of possible MoveNext() clashes between the two threads.

Does something like this already exits if not any thoughts on how it would be achieved?

Thanks

like image 833
Mark Vickery Avatar asked Oct 26 '12 14:10

Mark Vickery


1 Answers

Pipelines pattern implementation using .NET 4 BlockingCollection<T> and TPL Tasks is what you are looking for. See my answer with complete example in this StackOverflow post.

Example: 3 simultenious consumers

BlockingCollection<string> queue = new BlockingCollection<string>();    
public void Start()
{
    var producerWorker = Task.Factory.StartNew(() => ProducerImpl());
    var consumer1 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer2 = Task.Factory.StartNew(() => ConsumerImpl());
    var consumer3 = Task.Factory.StartNew(() => ConsumerImpl());

    Task.WaitAll(producerWorker, consumer1, consumer2, consumer3);
}

private void ProducerImpl()
{
   // 1. Read a raw data from a file
   // 2. Preprocess it
   // 3. Add item to a queue
   queue.Add(item);
}

// ConsumerImpl must be thrad safe 
// to allow launching multiple consumers simulteniously
private void ConsumerImpl()
{
    foreach (var item in queue.GetConsumingEnumerable())
    {
        // TODO
    }
}

If something is still not clear, please let me know.

High level diagram of pipelines flow:

enter image description here

like image 115
sll Avatar answered Sep 21 '22 01:09

sll