cross-posted to http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
I know... I'm not really using TplDataflow to its maximum potential. ATM I'm simply using BufferBlock
as a safe queue for message passing, where producer and consumer are running at different rates. I'm seeing some strange behaviour that leaves me stumped as to how to
proceed.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
In the code above (which is part of a 2000 line distributed solution), Send
is being called periodically every 100ms or so. This means an item is Post
ed to messageQueue
at around 10 times a second. This is verified. However, occasionally it appears that ReceiveAsync
does not complete within the timeout (i.e. the Post
is not causing ReceiveAsync
to complete) and TimeoutException
is being raised after 30s. At this point, messageQueue.Count
is in the hundreds. This is unexpected. This problem has been observed at slower rates of posting too (1 post/second) and usually happens before 1000 items have passed through the BufferBlock
.
So, to work around this issue, I am using the following code, which works, but occasionally causes 1s latency when receiving (due to the bug above occurring)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
This looks like a race condition in TDF to me, but I can't get to the bottom of why this doesn't occur in the other places where I use BufferBlock
in a similar fashion. Experimentally changing from ReceiveAsync
to Receive
doesn't help. I haven't checked, but I imagine in isolation, the code above works perfectly. It's a pattern I've seen documented in "Introduction to TPL Dataflow" tpldataflow.docx.
What can I do to get to the bottom of this? Are there any metrics that might help infer what's happening? If I can't create a reliable test case, what more information can I offer?
Help!
Stephen seems to think the following is the solution
var m = await messageQueue.ReceiveAsync();
instead of:
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
Can you confirm or deny this?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With