Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# async aggregate and dispatch

Tags:

c#

async-await

I'm having trouble assimilating the c# Task, async and await patterns.

Windows service, .NET v4.5.2 server-side.

I have a Windows service accepting a variety of sources of incoming records, arriving externally ad-hoc via a self-hosted web api. I would like to batch up these records and then forward them on to another service. If the number of batched records exceeds a threshold, the batch should be dispatched immediately. Furthermore, the batch as it stands should also be dispatched if a time interval has elapsed. This means that a record is never held for more than N seconds.

I'm struggling to fit this into a Task based async pattern.

In days gone by, I would have created a Thread, a ManualResetEvent and a System.Threading.Timer. The Thread would loop around a Wait on the reset event. The Timer would set the event when fired, as would the code doing the aggregation when the batch size exceeded the threshold. Following the Wait, the Thread would stop the Timer, do the dispatch (an HTTP Post), reset the Timer and clear the ManualResetEvent, the loop back and Wait.

However, I am seeing folk say that this is 'bad' as the Wait just blocks a valuable thread resource, and that async/await is my panacea.

First off, are they right? Is my way out-of-date and inefficient or can I JFDI?

I've found examples here for batching and here for tasks at intervals, but not a combination of the two.

Is this requirement actually compatible with async/await?

like image 259
Neil Moss Avatar asked Jan 06 '23 03:01

Neil Moss


1 Answers

Actually, you're almost doing the right thing, and they are also partially right.

What you should know is that you should avoid idle threads, with long waiting on events or waiting for I/O to complete (waiting on locks with few contention and fast statement blocks or spinning loops with compare-and-swap are usually OK).

What most of them don't know is that tasks are not magic, for instance, Task.Delay uses a Timer (more exactly, a System.Threading.Timer) and waiting on a non-complete task ends up using a ManualResetEventSlim (an improvement over ManualResetEvent, as it doesn't create a Win32 event unless explicitly asked for, e.g. ((IAsyncResult)task).AsyncWaitHandle).

So yes, your requirements are achievable with async/await, or tasks in general.

Runnable example at .NET Fiddle:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

public class Record
{
    private int n;

    public Record(int n)
    {
        this.n = n;
    }

    public int N { get { return n; } }
}

public class RecordReceiver
{
    // Arbitrary constants
    // You should fetch value from configuration and define sensible defaults
    private static readonly int threshold = 5;
    // I chose a low value so the example wouldn't timeout in .NET Fiddle
    private static readonly TimeSpan timeout = TimeSpan.FromMilliseconds(100);

    // I'll use a Stopwatch to trace execution times
    private readonly Stopwatch sw = Stopwatch.StartNew();
    // Using a separate private object for locking
    private readonly object lockObj = new object();
    // The list of accumulated records to execute in a batch
    private List<Record> records = new List<Record>();
    // The most recent TCS to signal completion when:
    // - the list count reached the threshold
    // - enough time has passed
    private TaskCompletionSource<IEnumerable<Record>> batchTcs;
    // A CTS to cancel the timer-based task when the threshold is reached
    // Not strictly necessary, but it reduces resource usage
    private CancellationTokenSource delayCts;
    // The task that will be completed when a batch of records has been dispatched
    private Task dispatchTask;

    // This method doesn't use async/await,
    // because we're not doing an async flow here.
    public Task ReceiveAsync(Record record)
    {
        Console.WriteLine("Received record {0} ({1})", record.N, sw.ElapsedMilliseconds);
        lock (lockObj)
        {
            // When the list of records is empty, set up the next task
            //
            // TaskCompletionSource is just what we need, we'll complete a task
            // not when we've finished some computation, but when we reach some criteria
            //
            // This is the main reason this method doesn't use async/await
            if (records.Count == 0)
            {
                // I want the dispatch task to run on the thread pool

                // In .NET 4.6, there's TaskCreationOptions.RunContinuationsAsynchronously
                // .NET 4.6
                //batchTcs = new TaskCompletionSource<IEnumerable<Record>>(TaskCreationOptions.RunContinuationsAsynchronously);
                //dispatchTask = DispatchRecordsAsync(batchTcs.Task);

                // Previously, we have to set up a continuation task using the default task scheduler
                // .NET 4.5.2
                batchTcs = new TaskCompletionSource<IEnumerable<Record>>();
                var asyncContinuationsTask = batchTcs.Task
                    .ContinueWith(bt => bt.Result, TaskScheduler.Default);
                dispatchTask = DispatchRecordsAsync(asyncContinuationsTask);

                // Create a cancellation token source to be able to cancel the timer
                //
                // To be used when we reach the threshold, to release timer resources
                delayCts = new CancellationTokenSource();
                Task.Delay(timeout, delayCts.Token)
                    .ContinueWith(
                        dt =>
                        {
                            // When we hit the timer, take the lock and set the batch
                            // task as complete, moving the current records to its result
                            lock (lockObj)
                            {
                                // Avoid dispatching an empty list of records
                                //
                                // Also avoid a race condition by checking the cancellation token
                                //
                                // The race would be for the actual timer function to start before
                                // we had a chance to cancel it
                                if ((records.Count > 0) && !delayCts.IsCancellationRequested)
                                {
                                    batchTcs.TrySetResult(new List<Record>(records));
                                    records.Clear();
                                }
                            }
                        },
                        // Since our continuation function is fast, we want it to run
                        // ASAP on the same thread where the actual timer function runs
                        //
                        // Note: this is just a hint, but I trust it'll be favored most of the time
                        TaskContinuationOptions.ExecuteSynchronously);
                // Remember that we want our batch task to have continuations
                // running outside the timer thread, since dispatching records
                // is probably too much work for a timer thread.
            }
            // Actually store the new record somewhere
            records.Add(record);
            // When we reach the threshold, set the batch task as complete,
            // moving the current records to its result
            //
            // Also, cancel the timer task
            if (records.Count >= threshold)
            {
                batchTcs.TrySetResult(new List<Record>(records));
                delayCts.Cancel();
                records.Clear();
            }
            // Return the last saved dispatch continuation task
            //
            // It'll start after either the timer or the threshold,
            // but more importantly, it'll complete after it dispatches all records
            return dispatchTask;
        }
    }

    // This method uses async/await, since we want to use the async flow
    internal async Task DispatchRecordsAsync(Task<IEnumerable<Record>> batchTask)
    {
        // We expect it to return a task right here, since the batch task hasn't had
        // a chance to complete when the first record arrives
        //
        // Task.ConfigureAwait(false) allows us to run synchronously and on the same thread
        // as the completer, but again, this is just a hint
        //
        // Remember we've set our task to run completions on the thread pool?
        //
        // With .NET 4.6, completing a TaskCompletionSource created with
        // TaskCreationOptions.RunContinuationsAsynchronously will start scheduling
        // continuations either on their captured SynchronizationContext or TaskScheduler,
        // or forced to use TaskScheduler.Default
        //
        // Before .NET 4.6, completing a TaskCompletionSource could mean
        // that continuations ran withing the completer, especially when
        // Task.ConfigureAwait(false) was used on an async awaiter, or when
        // Task.ContinueWith(..., TaskContinuationOptions.ExecuteSynchronously) was used
        // to set up a continuation
        //
        // That's why, before .NET 4.6, we need to actually run a task for that effect,
        // and we used Task.ContinueWith without TaskContinuationOptions.ExecuteSynchronously
        // and with TaskScheduler.Default, to ensure it gets scheduled
        //
        // So, why am I using Task.ConfigureAwait(false) here anyway?
        // Because it'll make a difference if this method is run from within
        // a Windows Forms or WPF thread, or any thread with a SynchronizationContext
        // or TaskScheduler that schedules tasks on a dedicated thread
        var batchedRecords = await batchTask.ConfigureAwait(false);
        // Async methods are transformed into state machines,
        // much like iterator methods, but with async specifics
        //
        // What await actually does is:
        // - check if the awaitable is complete
        //   - if so, continue executing
        //     Note: if every awaited awaitable is complete along an async method,
        //           the method will complete synchronously
        //           This is only expectable with tasks that have already completed
        //           or I/O that is always ready, e.g. MemoryStream
        //   - if not, return a task and schedule a continuation for just after the await expression
        //     Note: the continuation will resume the state machine on the next state
        //     Note: the returned task will complete on return or on exception,
        //           but that is something the compiled state machine will handle
        foreach (var record in batchedRecords)
        {
            Console.WriteLine("Dispatched record {0} ({1})", record.N, sw.ElapsedMilliseconds);
            // I used Task.Yield as a replacement for actual work
            //
            // It'll force the async state machine to always return here
            // and shedule a continuation that reenters the async state machine right afterwards
            //
            // This is not something you usually want on production code,
            // so please replace this with the actual dispatch
            await Task.Yield();
        }
    }
}

public class Program
{
    public static void Main()
    {
        // Our main entry point is synchronous, so we run an async entry point and wait on it
        //
        // The difference between MainAsync().Result and MainAsync().GetAwaiter().GetResult()
        // is in the way exceptions are thrown:
        // - the former aggregates exceptions, throwing an AggregateException
        // - the latter doesn't aggregate exceptions if it doesn't have to, throwing the actual exception
        //
        // Since I'm not combining tasks (e.g. Task.WhenAll), I'm not expecting multiple exceptions
        //
        // If my main method returned int, I could return the task's result
        // and I'd make MainAsync return Task<int> instead of just Task
        MainAsync().GetAwaiter().GetResult();
    }

    // Async entry point
    public static async Task MainAsync()
    {
        var receiver = new RecordReceiver();
        // I'll provide a few records: 
        // - a delay big enough between the 1st and the 2nd such that the 1st will be dispatched
        // - 8 records in a row, such that 5 of them will be dispatched, and 3 of them will wait
        // - again, a delay big enough that will provoke the last 3 records to be dispatched
        // - and a final record, which will wait to be dispatched
        //
        // We await for Task.Delay between providing records,
        // but we'll await for the records in the end only
        //
        // That is, we'll not await each record before the next,
        // as that would mean each record would only be dispatched after at least the timeout
        var t1 = receiver.ReceiveAsync(new Record(1));
        await Task.Delay(TimeSpan.FromMilliseconds(300));
        var t2 = receiver.ReceiveAsync(new Record(2));
        var t3 = receiver.ReceiveAsync(new Record(3));
        var t4 = receiver.ReceiveAsync(new Record(4));
        var t5 = receiver.ReceiveAsync(new Record(5));
        var t6 = receiver.ReceiveAsync(new Record(6));
        var t7 = receiver.ReceiveAsync(new Record(7));
        var t8 = receiver.ReceiveAsync(new Record(8));
        var t9 = receiver.ReceiveAsync(new Record(9));
        await Task.Delay(TimeSpan.FromMilliseconds(300));
        var t10 = receiver.ReceiveAsync(new Record(10));
        // I probably should have used a list of records, but this is just an example
        await Task.WhenAll(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10);
    }
}

You can make this more interesting, like returning a distinct task, such as Task<RecordDispatchReport>, from ReceiveAsync which is completed by the processing part of DispatchRecords, using a TaskCompletionSource for each record.

like image 116
acelent Avatar answered Jan 20 '23 17:01

acelent