I'm having trouble assimilating the c# Task, async and await patterns.
Windows service, .NET v4.5.2 server-side.
I have a Windows service accepting a variety of sources of incoming records, arriving externally ad-hoc via a self-hosted web api. I would like to batch up these records and then forward them on to another service. If the number of batched records exceeds a threshold, the batch should be dispatched immediately. Furthermore, the batch as it stands should also be dispatched if a time interval has elapsed. This means that a record is never held for more than N seconds.
I'm struggling to fit this into a Task based async pattern.
In days gone by, I would have created a Thread, a ManualResetEvent and a System.Threading.Timer. The Thread would loop around a Wait on the reset event. The Timer would set the event when fired, as would the code doing the aggregation when the batch size exceeded the threshold. Following the Wait, the Thread would stop the Timer, do the dispatch (an HTTP Post), reset the Timer and clear the ManualResetEvent, the loop back and Wait.
However, I am seeing folk say that this is 'bad' as the Wait just blocks a valuable thread resource, and that async/await is my panacea.
First off, are they right? Is my way out-of-date and inefficient or can I JFDI?
I've found examples here for batching and here for tasks at intervals, but not a combination of the two.
Is this requirement actually compatible with async/await?
Actually, you're almost doing the right thing, and they are also partially right.
What you should know is that you should avoid idle threads, with long waiting on events or waiting for I/O to complete (waiting on locks with few contention and fast statement blocks or spinning loops with compare-and-swap are usually OK).
What most of them don't know is that tasks are not magic, for instance, Task.Delay
uses a Timer
(more exactly, a System.Threading.Timer
) and waiting on a non-complete task ends up using a ManualResetEventSlim
(an improvement over ManualResetEvent
, as it doesn't create a Win32 event unless explicitly asked for, e.g. ((IAsyncResult)task).AsyncWaitHandle
).
So yes, your requirements are achievable with async/await, or tasks in general.
Runnable example at .NET Fiddle:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
public class Record
{
private int n;
public Record(int n)
{
this.n = n;
}
public int N { get { return n; } }
}
public class RecordReceiver
{
// Arbitrary constants
// You should fetch value from configuration and define sensible defaults
private static readonly int threshold = 5;
// I chose a low value so the example wouldn't timeout in .NET Fiddle
private static readonly TimeSpan timeout = TimeSpan.FromMilliseconds(100);
// I'll use a Stopwatch to trace execution times
private readonly Stopwatch sw = Stopwatch.StartNew();
// Using a separate private object for locking
private readonly object lockObj = new object();
// The list of accumulated records to execute in a batch
private List<Record> records = new List<Record>();
// The most recent TCS to signal completion when:
// - the list count reached the threshold
// - enough time has passed
private TaskCompletionSource<IEnumerable<Record>> batchTcs;
// A CTS to cancel the timer-based task when the threshold is reached
// Not strictly necessary, but it reduces resource usage
private CancellationTokenSource delayCts;
// The task that will be completed when a batch of records has been dispatched
private Task dispatchTask;
// This method doesn't use async/await,
// because we're not doing an async flow here.
public Task ReceiveAsync(Record record)
{
Console.WriteLine("Received record {0} ({1})", record.N, sw.ElapsedMilliseconds);
lock (lockObj)
{
// When the list of records is empty, set up the next task
//
// TaskCompletionSource is just what we need, we'll complete a task
// not when we've finished some computation, but when we reach some criteria
//
// This is the main reason this method doesn't use async/await
if (records.Count == 0)
{
// I want the dispatch task to run on the thread pool
// In .NET 4.6, there's TaskCreationOptions.RunContinuationsAsynchronously
// .NET 4.6
//batchTcs = new TaskCompletionSource<IEnumerable<Record>>(TaskCreationOptions.RunContinuationsAsynchronously);
//dispatchTask = DispatchRecordsAsync(batchTcs.Task);
// Previously, we have to set up a continuation task using the default task scheduler
// .NET 4.5.2
batchTcs = new TaskCompletionSource<IEnumerable<Record>>();
var asyncContinuationsTask = batchTcs.Task
.ContinueWith(bt => bt.Result, TaskScheduler.Default);
dispatchTask = DispatchRecordsAsync(asyncContinuationsTask);
// Create a cancellation token source to be able to cancel the timer
//
// To be used when we reach the threshold, to release timer resources
delayCts = new CancellationTokenSource();
Task.Delay(timeout, delayCts.Token)
.ContinueWith(
dt =>
{
// When we hit the timer, take the lock and set the batch
// task as complete, moving the current records to its result
lock (lockObj)
{
// Avoid dispatching an empty list of records
//
// Also avoid a race condition by checking the cancellation token
//
// The race would be for the actual timer function to start before
// we had a chance to cancel it
if ((records.Count > 0) && !delayCts.IsCancellationRequested)
{
batchTcs.TrySetResult(new List<Record>(records));
records.Clear();
}
}
},
// Since our continuation function is fast, we want it to run
// ASAP on the same thread where the actual timer function runs
//
// Note: this is just a hint, but I trust it'll be favored most of the time
TaskContinuationOptions.ExecuteSynchronously);
// Remember that we want our batch task to have continuations
// running outside the timer thread, since dispatching records
// is probably too much work for a timer thread.
}
// Actually store the new record somewhere
records.Add(record);
// When we reach the threshold, set the batch task as complete,
// moving the current records to its result
//
// Also, cancel the timer task
if (records.Count >= threshold)
{
batchTcs.TrySetResult(new List<Record>(records));
delayCts.Cancel();
records.Clear();
}
// Return the last saved dispatch continuation task
//
// It'll start after either the timer or the threshold,
// but more importantly, it'll complete after it dispatches all records
return dispatchTask;
}
}
// This method uses async/await, since we want to use the async flow
internal async Task DispatchRecordsAsync(Task<IEnumerable<Record>> batchTask)
{
// We expect it to return a task right here, since the batch task hasn't had
// a chance to complete when the first record arrives
//
// Task.ConfigureAwait(false) allows us to run synchronously and on the same thread
// as the completer, but again, this is just a hint
//
// Remember we've set our task to run completions on the thread pool?
//
// With .NET 4.6, completing a TaskCompletionSource created with
// TaskCreationOptions.RunContinuationsAsynchronously will start scheduling
// continuations either on their captured SynchronizationContext or TaskScheduler,
// or forced to use TaskScheduler.Default
//
// Before .NET 4.6, completing a TaskCompletionSource could mean
// that continuations ran withing the completer, especially when
// Task.ConfigureAwait(false) was used on an async awaiter, or when
// Task.ContinueWith(..., TaskContinuationOptions.ExecuteSynchronously) was used
// to set up a continuation
//
// That's why, before .NET 4.6, we need to actually run a task for that effect,
// and we used Task.ContinueWith without TaskContinuationOptions.ExecuteSynchronously
// and with TaskScheduler.Default, to ensure it gets scheduled
//
// So, why am I using Task.ConfigureAwait(false) here anyway?
// Because it'll make a difference if this method is run from within
// a Windows Forms or WPF thread, or any thread with a SynchronizationContext
// or TaskScheduler that schedules tasks on a dedicated thread
var batchedRecords = await batchTask.ConfigureAwait(false);
// Async methods are transformed into state machines,
// much like iterator methods, but with async specifics
//
// What await actually does is:
// - check if the awaitable is complete
// - if so, continue executing
// Note: if every awaited awaitable is complete along an async method,
// the method will complete synchronously
// This is only expectable with tasks that have already completed
// or I/O that is always ready, e.g. MemoryStream
// - if not, return a task and schedule a continuation for just after the await expression
// Note: the continuation will resume the state machine on the next state
// Note: the returned task will complete on return or on exception,
// but that is something the compiled state machine will handle
foreach (var record in batchedRecords)
{
Console.WriteLine("Dispatched record {0} ({1})", record.N, sw.ElapsedMilliseconds);
// I used Task.Yield as a replacement for actual work
//
// It'll force the async state machine to always return here
// and shedule a continuation that reenters the async state machine right afterwards
//
// This is not something you usually want on production code,
// so please replace this with the actual dispatch
await Task.Yield();
}
}
}
public class Program
{
public static void Main()
{
// Our main entry point is synchronous, so we run an async entry point and wait on it
//
// The difference between MainAsync().Result and MainAsync().GetAwaiter().GetResult()
// is in the way exceptions are thrown:
// - the former aggregates exceptions, throwing an AggregateException
// - the latter doesn't aggregate exceptions if it doesn't have to, throwing the actual exception
//
// Since I'm not combining tasks (e.g. Task.WhenAll), I'm not expecting multiple exceptions
//
// If my main method returned int, I could return the task's result
// and I'd make MainAsync return Task<int> instead of just Task
MainAsync().GetAwaiter().GetResult();
}
// Async entry point
public static async Task MainAsync()
{
var receiver = new RecordReceiver();
// I'll provide a few records:
// - a delay big enough between the 1st and the 2nd such that the 1st will be dispatched
// - 8 records in a row, such that 5 of them will be dispatched, and 3 of them will wait
// - again, a delay big enough that will provoke the last 3 records to be dispatched
// - and a final record, which will wait to be dispatched
//
// We await for Task.Delay between providing records,
// but we'll await for the records in the end only
//
// That is, we'll not await each record before the next,
// as that would mean each record would only be dispatched after at least the timeout
var t1 = receiver.ReceiveAsync(new Record(1));
await Task.Delay(TimeSpan.FromMilliseconds(300));
var t2 = receiver.ReceiveAsync(new Record(2));
var t3 = receiver.ReceiveAsync(new Record(3));
var t4 = receiver.ReceiveAsync(new Record(4));
var t5 = receiver.ReceiveAsync(new Record(5));
var t6 = receiver.ReceiveAsync(new Record(6));
var t7 = receiver.ReceiveAsync(new Record(7));
var t8 = receiver.ReceiveAsync(new Record(8));
var t9 = receiver.ReceiveAsync(new Record(9));
await Task.Delay(TimeSpan.FromMilliseconds(300));
var t10 = receiver.ReceiveAsync(new Record(10));
// I probably should have used a list of records, but this is just an example
await Task.WhenAll(t1, t2, t3, t4, t5, t6, t7, t8, t9, t10);
}
}
You can make this more interesting, like returning a distinct task, such as Task<RecordDispatchReport>
, from ReceiveAsync
which is completed by the processing part of DispatchRecords
, using a TaskCompletionSource
for each record.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With