Teaser: guys, this question is not about how to implement retry policy. It's about correct completion of a TPL Dataflow block.
This question is mostly a continuation of my previous question Retry policy within ITargetBlock. The answer to this question was @svick's smart solution that utilizes TransformBlock
(source) and TransformManyBlock
(target). The only problem left is to complete this block in a right way: wait for all the retries to be completed first, and then complete the target block. Here is what I ended up with (it's just a snippet, don't pay too many attention to a non-threadsafe retries
set):
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
while (target.InputCount > 0 || retries.Any())
await Task.Delay(100);
target.Complete();
});
The idea is to perform some kind of polling and verify whether there are still messages that waiting to be processed and there are no messages that require retrying. But in this solution I don't like the idea of polling.
Yes, I can encapsulate the logic of adding/removing retries into a separate class, and even e.g. perform some action when the set of retries becomes empty, but how to deal with target.InputCount > 0
condition? There is not such a callback that get called when there are no pending messages for the block, so it seems that verifying target.ItemCount
in a loop with a small delay is an only option.
Does anybody knows a smarter way to achieve this?
Maybe a ManualResetEvent can do the trick for you.
Add a public property to TransformManyBlock
private ManualResetEvent _signal = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }
And here you go:
var retries = new HashSet<RetryingMessage<TInput>>();
TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
async message =>
{
try
{
var result = new[] { await transform(message.Data) };
retries.Remove(message);
// Sets the state of the event to signaled, allowing one or more waiting threads to proceed
if(!retries.Any()) Signal.Set();
return result;
}
catch (Exception ex)
{
message.Exceptions.Add(ex);
if (message.RetriesRemaining == 0)
{
if (failureHandler != null)
failureHandler(message.Exceptions);
retries.Remove(message);
// Sets the state of the event to signaled, allowing one or more waiting threads to proceed
if(!retries.Any()) Signal.Set();
}
else
{
retries.Add(message);
message.RetriesRemaining--;
Task.Delay(retryDelay)
.ContinueWith(_ => target.Post(message));
}
return null;
}
}, dataflowBlockOptions);
source.LinkTo(target);
source.Completion.ContinueWith(async _ =>
{
//Blocks the current thread until the current WaitHandle receives a signal.
target.Signal.WaitOne();
target.Complete();
});
I am not sure where your target.InputCount
is set. So at the place you change target.InputCount
you can add following code:
if(InputCount == 0) Signal.Set();
Combining hwcverwe answer and JamieSee comment could be the ideal solution.
First, you need to create more than one event:
var signal = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);
Then, you have to create an observer, and subscribe to the TransformManyBlock
, so you are notified when a relevant event happens:
var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);
The observable can be quite easy:
private class RetryingBlockObserver<T> : IObserver<T> {
private ManualResetEvent completedEvent;
public RetryingBlockObserver(ManualResetEvent completedEvent) {
this.completedEvent = completedEvent;
}
public void OnCompleted() {
completedEvent.Set();
}
public void OnError(Exception error) {
//TODO
}
public void OnNext(T value) {
//TODO
}
}
And you can wait for either the signal, or completion (exhaustion of all the source items), or both
source.Completion.ContinueWith(async _ => {
WaitHandle.WaitAll(completedEvent, signal);
// Or WaitHandle.WaitAny, depending on your needs!
target.Complete();
});
You can inspect the result value of WaitAll to understand which event was set, and react accordingly. You can also add other events to the code, passing them to the observer, so that it can set them when needed. You can differentiate your behaviour and respond differently when an error is raised, for example
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With