Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

"Chunked" MemoryStream

Tags:

c#

.net

I'm looking for the implementation of MemoryStream which does not allocate memory as one big block, but rather a collection of chunks. I want to store a few GB of data in memory (64 bit) and avoid limitation of memory fragmentation.

like image 998
Karol Kolenda Avatar asked Jul 29 '09 21:07

Karol Kolenda


People also ask

What is the difference between MemoryStream and FileStream?

As the name suggests, a FileStream reads and writes to a file whereas a MemoryStream reads and writes to the memory. So it relates to where the stream is stored.

What is a MemoryStream?

MemoryStream encapsulates data stored as an unsigned byte array. The encapsulated data is directly accessible in memory. Memory streams can reduce the need for temporary buffers and files in an application. The current position of a stream is the position at which the next read or write operation takes place.

How do I reuse MemoryStream?

You can re-use the MemoryStream by Setting the Position to 0 and the Length to 0. By setting the length to 0 you do not clear the existing buffer, it only resets the internal counters.


4 Answers

You need to first determine if virtual address fragmentation is the problem.

If you are on a 64 bit machine (which you seem to indicate you are) I seriously doubt it is. Each 64 bit process has almost the the entire 64 bit virtual memory space available and your only worry is virtual address space fragmentation not physical memory fragmentation (which is what the operating system must worry about). The OS memory manager already pages memory under the covers. For the forseeable future you will not run out of virtual address space before you run out of physical memory. This is unlikely change before we both retire.

If you are have a 32 bit address space, then allocating contiguous large blocks of memory in the GB ramge you will encounter a fragmentation problem quite quickly. There is no stock chunk allocating memory stream in the CLR. There is one in the under the covers in ASP.NET (for other reasons) but it is not accessable. If you must travel this path you are probably better off writing one youself anyway because the usage pattern of your application is unlikely to be similar to many others and trying to fit your data into a 32bit address space will likely be your perf bottleneck.

I highly recommend requiring a 64 bit process if you are manipulating GBs of data. It will do a much better job than hand-rolled solutions to 32 bit address space fragmentation regardless of how cleaver you are.

like image 55
chuckj Avatar answered Oct 13 '22 01:10

chuckj


Something like this:

class ChunkedMemoryStream : Stream
{
    private readonly List<byte[]> _chunks = new List<byte[]>();
    private int _positionChunk;
    private int _positionOffset;
    private long _position;

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return true; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override void Flush() { }

    public override long Length
    {
        get { return _chunks.Sum(c => c.Length); }
    }

    public override long Position
    {
        get
        {
            return _position;
        }
        set
        {
            _position = value;

            _positionChunk = 0;

            while (_positionOffset != 0)
            {
                if (_positionChunk >= _chunks.Count)
                    throw new OverflowException();

                if (_positionOffset < _chunks[_positionChunk].Length)
                    return;

                _positionOffset -= _chunks[_positionChunk].Length;
                _positionChunk++;
            }
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int result = 0;
        while ((count != 0) && (_positionChunk != _chunks.Count))
        {
            int fromChunk = Math.Min(count, _chunks[_positionChunk].Length - _positionOffset);
            if (fromChunk != 0)
            {
                Array.Copy(_chunks[_positionChunk], _positionOffset, buffer, offset, fromChunk);
                offset += fromChunk;
                count -= fromChunk;
                result += fromChunk;
                _position += fromChunk;
            }

            _positionOffset = 0;
            _positionChunk++;
        }
        return result;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        long newPos = 0;

        switch (origin)
        {
            case SeekOrigin.Begin:
                newPos = offset;
                break;
            case SeekOrigin.Current:
                newPos = Position + offset;
                break;
            case SeekOrigin.End:
                newPos = Length - offset;
                break;
        }

        Position = Math.Max(0, Math.Min(newPos, Length));
        return newPos;
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        while ((count != 0) && (_positionChunk != _chunks.Count))
        {
            int toChunk = Math.Min(count, _chunks[_positionChunk].Length - _positionOffset);
            if (toChunk != 0)
            {
                Array.Copy(buffer, offset, _chunks[_positionChunk], _positionOffset, toChunk);
                offset += toChunk;
                count -= toChunk;
                _position += toChunk;
            }

            _positionOffset = 0;
            _positionChunk++;
        }

        if (count != 0)
        {
            byte[] chunk = new byte[count];
            Array.Copy(buffer, offset, chunk, 0, count);
            _chunks.Add(chunk);
            _positionChunk = _chunks.Count;
            _position += count;
        }
    }
}

class Program
{
    static void Main(string[] args)
    {
        ChunkedMemoryStream cms = new ChunkedMemoryStream();

        Debug.Assert(cms.Length == 0);
        Debug.Assert(cms.Position == 0);

        cms.Position = 0;

        byte[] helloworld = Encoding.UTF8.GetBytes("hello world");

        cms.Write(helloworld, 0, 3);
        cms.Write(helloworld, 3, 3);
        cms.Write(helloworld, 6, 5);

        Debug.Assert(cms.Length == 11);
        Debug.Assert(cms.Position == 11);

        cms.Position = 0;

        byte[] b = new byte[20];
        cms.Read(b, 3, (int)cms.Length);
        Debug.Assert(b.Skip(3).Take(11).SequenceEqual(helloworld));

        cms.Position = 0;
        cms.Write(Encoding.UTF8.GetBytes("seeya"), 0, 5);

        Debug.Assert(cms.Length == 11);
        Debug.Assert(cms.Position == 5);

        cms.Position = 0;
        cms.Read(b, 0, (byte) cms.Length);
        Debug.Assert(b.Take(11).SequenceEqual(Encoding.UTF8.GetBytes("seeya world")));

        Debug.Assert(cms.Length == 11);
        Debug.Assert(cms.Position == 11);

        cms.Write(Encoding.UTF8.GetBytes(" again"), 0, 6);

        Debug.Assert(cms.Length == 17);
        Debug.Assert(cms.Position == 17);

        cms.Position = 0;
        cms.Read(b, 0, (byte)cms.Length);
        Debug.Assert(b.Take(17).SequenceEqual(Encoding.UTF8.GetBytes("seeya world again")));

    }
}
like image 41
Daniel Earwicker Avatar answered Oct 13 '22 01:10

Daniel Earwicker


The Bing team has released RecyclableMemoryStream and wrote about it here. The benefits they cite are:

  1. Eliminate Large Object Heap allocations by using pooled buffers
  2. Incur far fewer gen 2 GCs, and spend far less time paused due to GC
  3. Avoid memory leaks by having a bounded pool size
  4. Avoid memory fragmentation
  5. Provide excellent debuggability
  6. Provide metrics for performance tracking
like image 9
EricLaw Avatar answered Oct 13 '22 00:10

EricLaw


I've found similar problem in my application. I've read large amount of compressed data and I suffered from OutOfMemoryException using MemoryStream. I've written my own implementation of "chunked" memory stream based on collection of byte arrays. If you have any idea how to make this memory stream more effective, please write me about it.

    public sealed class ChunkedMemoryStream : Stream
{
    #region Constants

    private const int BUFFER_LENGTH = 65536;
    private const byte ONE = 1;
    private const byte ZERO = 0;

    #endregion

    #region Readonly & Static Fields

    private readonly Collection<byte[]> _chunks;

    #endregion

    #region Fields

    private long _length;

    private long _position;
    private const byte TWO = 2;

    #endregion

    #region C'tors

    public ChunkedMemoryStream()
    {
        _chunks = new Collection<byte[]> { new byte[BUFFER_LENGTH], new byte[BUFFER_LENGTH] };
        _position = ZERO;
        _length = ZERO;
    }

    #endregion

    #region Instance Properties

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return true; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override long Length
    {
        get { return _length; }
    }

    public override long Position
    {
        get { return _position; }
        set
        {
            if (!CanSeek)
                throw new NotSupportedException();

            _position = value;

            if (_position > _length)
                _position = _length - ONE;
        }
    }


    private byte[] CurrentChunk
    {
        get
        {
            long positionDividedByBufferLength = _position / BUFFER_LENGTH;
            var chunkIndex = Convert.ToInt32(positionDividedByBufferLength);
            byte[] chunk = _chunks[chunkIndex];
            return chunk;
        }
    }

    private int PositionInChunk
    {
        get
        {
            int positionInChunk = Convert.ToInt32(_position % BUFFER_LENGTH);
            return positionInChunk;
        }
    }

    private int RemainingBytesInCurrentChunk
    {
        get
        {
            Contract.Ensures(Contract.Result<int>() > ZERO);
            int remainingBytesInCurrentChunk = CurrentChunk.Length - PositionInChunk;
            return remainingBytesInCurrentChunk;
        }
    }

    #endregion

    #region Instance Methods

    public override void Flush()
    {
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (offset + count > buffer.Length)
            throw new ArgumentException();

        if (buffer == null)
            throw new ArgumentNullException();

        if (offset < ZERO || count < ZERO)
            throw new ArgumentOutOfRangeException();

        if (!CanRead)
            throw new NotSupportedException();

        int bytesToRead = count;
        if (_length - _position < bytesToRead)
            bytesToRead = Convert.ToInt32(_length - _position);

        int bytesreaded = 0;
        while (bytesToRead > ZERO)
        {
            // get remaining bytes in current chunk
            // read bytes in current chunk
            // advance to next position
            int remainingBytesInCurrentChunk = RemainingBytesInCurrentChunk;
            if (remainingBytesInCurrentChunk > bytesToRead)
                remainingBytesInCurrentChunk = bytesToRead;
            Array.Copy(CurrentChunk, PositionInChunk, buffer, offset, remainingBytesInCurrentChunk);
            //move position in source
            _position += remainingBytesInCurrentChunk;
            //move position in target
            offset += remainingBytesInCurrentChunk;
            //bytesToRead is smaller
            bytesToRead -= remainingBytesInCurrentChunk;
            //count readed bytes;
            bytesreaded += remainingBytesInCurrentChunk;
        }
        return bytesreaded;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        switch (origin)
        {
            case SeekOrigin.Begin:
                Position = offset;
                break;
            case SeekOrigin.Current:
                Position += offset;
                break;
            case SeekOrigin.End:
                Position = Length + offset;
                break;
        }
        return Position;
    }

    private long Capacity
    {
        get
        {
            int numberOfChunks = _chunks.Count;


            long capacity = numberOfChunks * BUFFER_LENGTH;
            return capacity;
        }
    }

    public override void SetLength(long value)
    {
        if (value > _length)
        {
            while (value > Capacity)
            {
                var item = new byte[BUFFER_LENGTH];
                _chunks.Add(item);
            }
        }
        else if (value < _length)
        {
            var decimalValue = Convert.ToDecimal(value);
            var valueToBeCompared = decimalValue % BUFFER_LENGTH == ZERO ? Capacity : Capacity - BUFFER_LENGTH;
            //remove data chunks, but leave at least two chunks
            while (value < valueToBeCompared && _chunks.Count > TWO)
            {
                byte[] lastChunk = _chunks.Last();
                _chunks.Remove(lastChunk);
            }
        }
        _length = value;
        if (_position > _length - ONE)
            _position = _length == 0 ? ZERO : _length - ONE;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (!CanWrite)
            throw new NotSupportedException();

        int bytesToWrite = count;

        while (bytesToWrite > ZERO)
        {
            //get remaining space in current chunk
            int remainingBytesInCurrentChunk = RemainingBytesInCurrentChunk;

            //if count of bytes to be written is fewer than remaining
            if (remainingBytesInCurrentChunk > bytesToWrite)
                remainingBytesInCurrentChunk = bytesToWrite;

            //if remaining bytes is still greater than zero
            if (remainingBytesInCurrentChunk > ZERO)
            {
                //write remaining bytes to current Chunk

                Array.Copy(buffer, offset, CurrentChunk, PositionInChunk, remainingBytesInCurrentChunk);

                //change offset of source array
                offset += remainingBytesInCurrentChunk;
                //change bytes to write
                bytesToWrite -= remainingBytesInCurrentChunk;
                //change length and position
                _length += remainingBytesInCurrentChunk;
                _position += remainingBytesInCurrentChunk;
            }

            if (Capacity == _position)
                _chunks.Add(new byte[BUFFER_LENGTH]);
        }
    }

    /// <summary>
    ///     Gets entire content of stream regardless of Position value and return output as byte array
    /// </summary>
    /// <returns>byte array</returns>
    public byte[] ToArray()
    {
        var outputArray = new byte[Length];
        if (outputArray.Length != ZERO)
        {
            long outputPosition = ZERO;
            foreach (byte[] chunk in _chunks)
            {
                var remainingLength = (Length - outputPosition) > chunk.Length
                                          ? chunk.Length
                                          : Length - outputPosition;
                Array.Copy(chunk, ZERO, outputArray, outputPosition, remainingLength);
                outputPosition = outputPosition + remainingLength;
            }
        }
        return outputArray;
    }

    /// <summary>
    ///     Method set Position to first element and write entire stream to another
    /// </summary>
    /// <param name="stream">Target stream</param>
    public void WriteTo(Stream stream)
    {
        Contract.Requires(stream != null);

        Position = ZERO;
        var buffer = new byte[BUFFER_LENGTH];
        int bytesReaded;
        do
        {
            bytesReaded = Read(buffer, ZERO, BUFFER_LENGTH);
            stream.Write(buffer, ZERO, bytesReaded);
        } while (bytesReaded > ZERO);
    }

    #endregion
}
like image 4
Rudolf Dvoracek Avatar answered Oct 12 '22 23:10

Rudolf Dvoracek