How to buffer a burst of events into fewer resulting actions




I want to reduce multiple events into a single delayed action. After some trigger occurs I expect some more similar triggers to come, but I prefer not to repeat the resulting delayed action. The action waits, to give a chance of completion to the burst.
The question: How can I do it in an elegant reusable way?
Till now I used a property to flag the event and trigger a delayed action like below:

public  void SomeMethod()
        SomeFlag = true; //this will intentionally return to the caller before completing the resulting buffered actions.
    private bool someFlag;
    public bool SomeFlag
        get { return someFlag; }
            if (someFlag != value)
                someFlag = value;
                if (value)

    public async void SomeDelayedMethod(int delay)
        //some bufferred work.
        await Task.Delay(delay);
        SomeFlag = false;

below is a shorter way, but still not generic or reusable... I want something concise that packages the actions and the flag, and keeps the functionality (returning to the caller before execution is complete (like today)). I also need to be able to pass an object reference to this action)

 public void SerializeAccountsToConfig()
        if (!alreadyFlagged)
            alreadyFlagged = true;
            SerializeDelayed(5000, Serialize);
    public async void SerializeDelayed(int delay, Action whatToDo)
        await Task.Delay(delay);

    private bool alreadyFlagged;
    private void Serialize()
        //some buferred work.
        //string json = JsonConvert.SerializeObject(Accounts, Formatting.Indented);
        //Settings1.Default.Accounts = json;
        alreadyFlagged = false;
2 Answers

Here's a thread-safe and reusable solution.

You can create an instance of DelayedSingleAction, and in the constructor you pass the action that you want to have performed. I believe this is thread safe, though there is a tiny risk that it will restart the timer just before commencing the action, but I think that risk would exist no matter what the solution is.

public class DelayedSingleAction
    private readonly Action _action;
    private readonly long _millisecondsDelay;
    private long _syncValue = 1;
    public DelayedSingleAction(Action action, long millisecondsDelay)
        _action = action;
        _millisecondsDelay = millisecondsDelay;

    private Task _waitingTask = null;
    private void DoActionAndClearTask(Task _)
        Interlocked.Exchange(ref _syncValue, 1);

    public void PerformAction()
        if (Interlocked.Exchange(ref _syncValue, 0) == 1)
            _waitingTask = Task.Delay(TimeSpan.FromMilliseconds(_millisecondsDelay))

    public Task Complete()
        return _waitingTask ?? Task.FromResult(0);

See this dotnetfiddle for an example which invokes one action continuously from multiple threads.


Since you're interested in RX here simple console app sample:

static void Main(string[] args)
    // event source
    var burstEvents = Observable.Interval(TimeSpan.FromMilliseconds(50));

    var subscription = burstEvents

        .Buffer(TimeSpan.FromSeconds(3)) // collect events 3 seconds
        //.Buffer(50) // or collect 50 events

        .Subscribe(events =>
            //Console.WriteLine(events.First()); // take only first event

            // or process event collection
            foreach (var e in events)
                Console.Write(e + " ");

