Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consuming BlockingCollection with multiple consumers

I have a program as follows

 class Program
    {
        public static int TaskCount { get; set; }
        public static BlockingCollection<string> queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
        static void Main(string[] args)
        {
            TaskCount = 3;
            Task.Factory.StartNew(() => Producer());

            for (int i = 0; i < TaskCount; i++)
                Task.Factory.StartNew(() => Consumer());
            Console.ReadKey();
        }

        private static void Producer()
        {
            using (StreamWriter sw = File.AppendText(@"C:\pcadder.txt"))
            {
                for (int i = 0; i < 15; i++)
                {
                    queue.Add("Item: " + (i+1).ToString());
                    var message = string.Format("{2}.Item added: Item {0} at {1}", (i+1).ToString(), DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),i+1);
                    Console.WriteLine(message);
                    sw.WriteLine(message);

                }
                queue.CompleteAdding();
            }
        }
        private static void Consumer()
        {
            int count = 1;
            foreach (var item in queue.GetConsumingEnumerable())
            {
                var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
                        Thread.CurrentThread.ManagedThreadId,count);
                Console.WriteLine(message);

                using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
                    sw.WriteLine(message);
                count += 1;
            }
        }
    }

The Output

1.Item added: Item 1 at 2017.07.06 09:58:49.784734
2.Item added: Item 2 at 2017.07.06 09:58:49.784734
3.Item added: Item 3 at 2017.07.06 09:58:49.784734
4.Item added: Item 4 at 2017.07.06 09:58:49.784734
5.Item added: Item 5 at 2017.07.06 09:58:49.784734
6.Item added: Item 6 at 2017.07.06 09:58:49.784734
7.Item added: Item 7 at 2017.07.06 09:58:49.784734
8.Item added: Item 8 at 2017.07.06 09:58:49.784734
9.Item added: Item 9 at 2017.07.06 09:58:49.784734
10.Item added: Item 10 at 2017.07.06 09:58:49.784734
11.Item added: Item 11 at 2017.07.06 09:58:49.784734
12.Item added: Item 12 at 2017.07.06 09:58:49.784734
13.Item added: Item 13 at 2017.07.06 09:58:49.784734
14.Item added: Item 14 at 2017.07.06 09:58:49.784734
15.Item added: Item 15 at 2017.07.06 09:58:49.784734

1.Item taken: Item: 3 at 2017.07.06 09:58:49.784734 by thread 7.
1.Item taken: Item: 2 at 2017.07.06 09:58:49.784734 by thread 4.
1.Item taken: Item: 1 at 2017.07.06 09:58:49.784734 by thread 5.
2.Item taken: Item: 5 at 2017.07.06 09:58:49.784734 by thread 4.
2.Item taken: Item: 4 at 2017.07.06 09:58:49.784734 by thread 7.
2.Item taken: Item: 6 at 2017.07.06 09:58:49.784734 by thread 5.
3.Item taken: Item: 7 at 2017.07.06 09:58:49.784734 by thread 4.
3.Item taken: Item: 8 at 2017.07.06 09:58:49.784734 by thread 7.
3.Item taken: Item: 9 at 2017.07.06 09:58:49.784734 by thread 5.
4.Item taken: Item: 11 at 2017.07.06 09:58:49.784734 by thread 7.
4.Item taken: Item: 12 at 2017.07.06 09:58:49.784734 by thread 5.
5.Item taken: Item: 13 at 2017.07.06 09:58:49.784734 by thread 7.
5.Item taken: Item: 14 at 2017.07.06 09:58:49.784734 by thread 5.
6.Item taken: Item: 15 at 2017.07.06 09:58:49.784734 by thread 7.

After almost every running the program, I have one item missing in consumer logs.(Here, Item 10 is missing). I could't catch why this is happening.

  1. How does not this item processed?
  2. While using multiple tasks as consumer, is processing items in order (FIFO) being spoiled? If I want to keep/force processing in FIFO order inside the consumer method, should I avoid using multiple tasks? (Processing may include I/O,Networking operations)
like image 350
ibubi Avatar asked Mar 08 '23 16:03

ibubi


1 Answers

Here

using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
    sw.WriteLine(message);

You rapidly write to the same file from multiple threads. It's not a good idea and this code actually throws an exception. It goes unnoticed in your code because you don't handle any exceptions and it happens in background thread so does not crash your application. That answers why you have missing items in your log. You can write to the same file like that for example:

// create it outside `Consumer` and make synchronized
using (var taker = TextWriter.Synchronized(File.AppendText(@"pctaker.txt"))) {
    TaskCount = 3;
    Task.Factory.StartNew(() => Producer());
    //Producer();
    for (int i = 0; i < TaskCount; i++)
        // pass to consumer
        Task.Factory.StartNew(() => Consumer(taker));
    Console.ReadKey();
}

private static void Consumer(TextWriter writer)
{
    int count = 1;
    foreach (var item in queue.GetConsumingEnumerable())
    {
        var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
                Thread.CurrentThread.ManagedThreadId, count);
        Console.WriteLine(message);                                
        writer.WriteLine(message);
        writer.Flush();
        count += 1;
    }
}

Or just put a lock around writing to the file.

As for second question - consumers still pull items in FIFO order, but since you have multiple consumers - order of processing is of course not guaranteed, because all consumers process items in parallel. Consumer A pulls item 1, consumer B pulls item 2 at the same time. Consumer A takes 100ms to process item 1, consumer B takes 10ms to process item 2. In result - item 2 is processed (that is - written to your log) before item 1.

like image 157
Evk Avatar answered Mar 12 '23 01:03

Evk