Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to aggregate the data from an async producer and write it to a file?

I'm learning about async/await patterns in C#. Currently I'm trying to solve a problem like this:

  • There is a producer (a hardware device) that generates 1000 packets per second. I need to log this data to a file.

  • The device only has a ReadAsync() method to report a single packet at a time.

  • I need to buffer the packets and write them in the order they are generated to the file, only once a second.

  • Write operation should fail if the write process is not finished in time when the next batch of packets is ready to be written.

So far I have written something like below. It works but I am not sure if this is the best way to solve the problem. Any comments or suggestion? What is the best practice to approach this kind of Producer/Consumer problem where the consumer needs to aggregate the data received from the producer?

static async Task TestLogger(Device device, int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}
like image 908
AlefSin Avatar asked Jun 05 '14 09:06

AlefSin


2 Answers

You could use the following idea, provided the criteria for flush is the number of packets (up to 1000). I did not test it. It makes use of Stephen Cleary's AsyncProducerConsumerQueue<T> featured in this question.

AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(), token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task previousFlush = Task.FromResult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!previousFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the previous flush if not ready
           throw new Exception("failed to flush on time.");
       }
       await previousFlush; // it's completed, observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token);
    }
}

If you don't want to fail the logger but rather prefer to cancel the flush and proceed to the next batch, you can do so with a minimal change to this code.

In response to @l3arnon comment:

  1. A packet is not a byte, it's byte[]. 2. You haven't used the OP's ToHexString. 3. AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow. 4. You await previousFlush for errors just after you throw an exception which makes that line redundant. etc. In short: I think the possible added value doesn't justify this very complicated solution.
  1. "A packet is not a byte, it's byte[]" - A packet is a byte, this is obvious from the OP's code: buffer[i] = await device.ReadAsync(). Then, a batch of packets is byte[].
  2. "You haven't used the OP's ToHexString." - The goal was to show how to use Stream.WriteAsync which natively accepts a cancellation token, instead of WriteLineAsync which doesn't allow cancellation. It's trivial to use ToHexString with Stream.WriteAsync and still take advantage of cancellation support:

    var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
        Environment.NewLine);
    _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
    
  3. "AsyncProducerConsumerQueue is much less robust and tested than .Net's TPL Dataflow" - I don't think this is a determined fact. However, if the OP is concerned about it, he can use regular BlockingCollection, which doesn't block the producer thread. It's OK to block the consumer thread while waiting for the next batch, because writing is done in parallel. As opposed to this, your TPL Dataflow version carries one redundant CPU and lock intensive operation: moving data from producer pipeline to writer pipleline with logAction.Post(packet), byte by byte. My code doesn't do that.

  4. "You await previousFlush for errors just after you throw an exception which makes that line redundant." - This line is not redundant. Perhaps, you're missing this point: previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled is also true. So, await previousFlush is relevant there to observe any errors on the completed tasks (e.g., a write failure), which otherwise will be lost.

like image 56
avo Avatar answered Nov 14 '22 19:11

avo


Assuming you can batch by amount (1000) instead of time (1 second), the simplest solution is probably using TPL Dataflow's BatchBlock which automatically batches a flow of items by size:

async Task TestLogger(Device device, int seconds)
{
    var writer = new StreamWriter("test.log");
    var batch = new BatchBlock<byte[]>(1000);
    var logAction = new ActionBlock<byte[]>(
        packet =>
        {
            return writer.WriteLineAsync(ToHexString(packet));
        });
    ActionBlock<byte[]> transferAction;
    transferAction = new ActionBlock<byte[][]>(
        bytes =>
        {
            foreach (var packet in bytes)
            {
                if (transferAction.InputCount > 0)
                {
                    return; // or throw new Exception("Write Time Out!");
                }
                logAction.Post(packet);
            }
        }
    );

    batch.LinkTo(transferAction);
    logAction.Completion.ContinueWith(_ => writer.Dispose());

    while (true)
    {
        batch.Post(await device.ReadAsync());
    }
}
like image 31
i3arnon Avatar answered Nov 14 '22 18:11

i3arnon