Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

FIFO/Queue buffer specialising in byte streams

Is there any .NET data structure/combination of classes that allows for byte data to be appended to the end of a buffer but all peeks and reads are from the start, shortening the buffer when I read?

The MemoryStream class seems to do part of this, but I need to maintain separate locations for reading and writing, and it doesn't automatically discard the data at the start after it's read.

An answer has been posted in reply to this question which is basically what I'm trying to do but I'd prefer something I can do asynchronous I/O on in different components of the same process, just like a normal pipe or even a network stream (I need to filter/process the data first).

like image 294
Deanna Avatar asked Nov 22 '11 02:11

Deanna


2 Answers

I'll post a stripped out copy of some logic i wrote for a project at work once. The advantage of this version is that it works with a linked list of buffered data and therefore you dont have to cache huge amounts of memory and/or copy memory around when reading. furthermore, its thread safe and behaves like a network stream, that is: When reading when there is no data available: Wait untill there is data available or timeout. Also, when reading x amounts of bytes and there are only y amounts of bytes, return after reading all bytes. I hope this helps!

    public class SlidingStream : Stream
{
    #region Other stream member implementations

    ...

    #endregion Other stream member implementations

    public SlidingStream()
    {
        ReadTimeout = -1;
    }

    private readonly object _writeSyncRoot = new object();
    private readonly object _readSyncRoot = new object();
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>();
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim();

    public int ReadTimeout { get; set; }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (_dataAvailableResetEvent.Wait(ReadTimeout))
            throw new TimeoutException("No data available");

        lock (_readSyncRoot)
        {
            int currentCount = 0;
            int currentOffset = 0;

            while (currentCount != count)
            {
                ArraySegment<byte> segment = _pendingSegments.First.Value;
                _pendingSegments.RemoveFirst();

                int index = segment.Offset;
                for (; index < segment.Count; index++)
                {
                    if (currentOffset < offset)
                    {
                        currentOffset++;
                    }
                    else
                    {
                        buffer[currentCount] = segment.Array[index];
                        currentCount++;
                    }
                }

                if (currentCount == count)
                {
                    if (index < segment.Offset + segment.Count)
                    {
                        _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index));
                    }
                }

                if (_pendingSegments.Count == 0)
                {
                    _dataAvailableResetEvent.Reset();

                    return currentCount;
                }
            }

            return currentCount;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_writeSyncRoot)
        {
            byte[] copy = new byte[count];
            Array.Copy(buffer, offset, copy, 0, count);

            _pendingSegments.AddLast(new ArraySegment<byte>(copy));

            _dataAvailableResetEvent.Set();
        }   
    }
}
like image 137
Polity Avatar answered Sep 18 '22 13:09

Polity


The code can be simpler than in the accepted answer. There is no need to use a for loop.:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}
like image 38
Elmue Avatar answered Sep 17 '22 13:09

Elmue