I have a program as follows
class Program
{
public static int TaskCount { get; set; }
public static BlockingCollection<string> queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
static void Main(string[] args)
{
TaskCount = 3;
Task.Factory.StartNew(() => Producer());
for (int i = 0; i < TaskCount; i++)
Task.Factory.StartNew(() => Consumer());
Console.ReadKey();
}
private static void Producer()
{
using (StreamWriter sw = File.AppendText(@"C:\pcadder.txt"))
{
for (int i = 0; i < 15; i++)
{
queue.Add("Item: " + (i+1).ToString());
var message = string.Format("{2}.Item added: Item {0} at {1}", (i+1).ToString(), DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),i+1);
Console.WriteLine(message);
sw.WriteLine(message);
}
queue.CompleteAdding();
}
}
private static void Consumer()
{
int count = 1;
foreach (var item in queue.GetConsumingEnumerable())
{
var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
Thread.CurrentThread.ManagedThreadId,count);
Console.WriteLine(message);
using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
sw.WriteLine(message);
count += 1;
}
}
}
1.Item added: Item 1 at 2017.07.06 09:58:49.784734
2.Item added: Item 2 at 2017.07.06 09:58:49.784734
3.Item added: Item 3 at 2017.07.06 09:58:49.784734
4.Item added: Item 4 at 2017.07.06 09:58:49.784734
5.Item added: Item 5 at 2017.07.06 09:58:49.784734
6.Item added: Item 6 at 2017.07.06 09:58:49.784734
7.Item added: Item 7 at 2017.07.06 09:58:49.784734
8.Item added: Item 8 at 2017.07.06 09:58:49.784734
9.Item added: Item 9 at 2017.07.06 09:58:49.784734
10.Item added: Item 10 at 2017.07.06 09:58:49.784734
11.Item added: Item 11 at 2017.07.06 09:58:49.784734
12.Item added: Item 12 at 2017.07.06 09:58:49.784734
13.Item added: Item 13 at 2017.07.06 09:58:49.784734
14.Item added: Item 14 at 2017.07.06 09:58:49.784734
15.Item added: Item 15 at 2017.07.06 09:58:49.784734
1.Item taken: Item: 3 at 2017.07.06 09:58:49.784734 by thread 7.
1.Item taken: Item: 2 at 2017.07.06 09:58:49.784734 by thread 4.
1.Item taken: Item: 1 at 2017.07.06 09:58:49.784734 by thread 5.
2.Item taken: Item: 5 at 2017.07.06 09:58:49.784734 by thread 4.
2.Item taken: Item: 4 at 2017.07.06 09:58:49.784734 by thread 7.
2.Item taken: Item: 6 at 2017.07.06 09:58:49.784734 by thread 5.
3.Item taken: Item: 7 at 2017.07.06 09:58:49.784734 by thread 4.
3.Item taken: Item: 8 at 2017.07.06 09:58:49.784734 by thread 7.
3.Item taken: Item: 9 at 2017.07.06 09:58:49.784734 by thread 5.
4.Item taken: Item: 11 at 2017.07.06 09:58:49.784734 by thread 7.
4.Item taken: Item: 12 at 2017.07.06 09:58:49.784734 by thread 5.
5.Item taken: Item: 13 at 2017.07.06 09:58:49.784734 by thread 7.
5.Item taken: Item: 14 at 2017.07.06 09:58:49.784734 by thread 5.
6.Item taken: Item: 15 at 2017.07.06 09:58:49.784734 by thread 7.
After almost every running the program, I have one item missing in consumer logs.(Here, Item 10
is missing). I could't catch why this is happening.
Here
using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
sw.WriteLine(message);
You rapidly write to the same file from multiple threads. It's not a good idea and this code actually throws an exception. It goes unnoticed in your code because you don't handle any exceptions and it happens in background thread so does not crash your application. That answers why you have missing items in your log. You can write to the same file like that for example:
// create it outside `Consumer` and make synchronized
using (var taker = TextWriter.Synchronized(File.AppendText(@"pctaker.txt"))) {
TaskCount = 3;
Task.Factory.StartNew(() => Producer());
//Producer();
for (int i = 0; i < TaskCount; i++)
// pass to consumer
Task.Factory.StartNew(() => Consumer(taker));
Console.ReadKey();
}
private static void Consumer(TextWriter writer)
{
int count = 1;
foreach (var item in queue.GetConsumingEnumerable())
{
var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
Thread.CurrentThread.ManagedThreadId, count);
Console.WriteLine(message);
writer.WriteLine(message);
writer.Flush();
count += 1;
}
}
Or just put a lock
around writing to the file.
As for second question - consumers still pull items in FIFO order, but since you have multiple consumers - order of processing is of course not guaranteed, because all consumers process items in parallel. Consumer A pulls item 1, consumer B pulls item 2 at the same time. Consumer A takes 100ms to process item 1, consumer B takes 10ms to process item 2. In result - item 2 is processed (that is - written to your log) before item 1.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With