Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow block which delays the forward of the message to the next block

I require a Dataflow block which delays the forward of the message to the next block based on the timestamp in the message (LogEntry).

This is what i came up with but it feels not right. Any suggestions for improvements?

  private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
    {
        var buffer = new ConcurrentQueue<LogEntry>();

        var source = new BufferBlock<LogEntry>();

        var target = new ActionBlock<LogEntry>(item =>
        {
            buffer.Enqueue(item);
        });


        Task.Run(() =>
            {
                LogEntry entry;
                while (true)
                {
                    entry = null;
                    if (buffer.TryPeek(out entry))
                    {
                        if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
                        {
                            buffer.TryDequeue(out entry);
                            source.Post(entry);
                        }
                    }
                }
            });


        target.Completion.ContinueWith(delegate
        {
            LogEntry entry;
            while (buffer.TryDequeue(out entry))
            {
                source.Post(entry);
            }

            source.Complete();
        });

        return DataflowBlock.Encapsulate(target, source);
    }
like image 891
rudimenter Avatar asked Dec 25 '22 06:12

rudimenter


1 Answers

You could simply use a single TransformBlock that asynchronously waits out the delay using Task.Delay:

IPropagatorBlock<TItem, TItem> DelayedForwardBlock<TItem>(TimeSpan delay)
{
    return new TransformBlock<TItem, TItem>(async item =>
    {
        await Task.Delay(delay);
        return item;
    });
}

Usage:

var block = DelayedForwardBlock<LogEntry>(TimeSpan.FromMinutes(5));
like image 76
i3arnon Avatar answered Jan 11 '23 22:01

i3arnon