I'm looking for the implementation of MemoryStream which does not allocate memory as one big block, but rather a collection of chunks. I want to store a few GB of data in memory (64 bit) and avoid limitation of memory fragmentation.
As the name suggests, a FileStream reads and writes to a file whereas a MemoryStream reads and writes to the memory. So it relates to where the stream is stored.
MemoryStream encapsulates data stored as an unsigned byte array. The encapsulated data is directly accessible in memory. Memory streams can reduce the need for temporary buffers and files in an application. The current position of a stream is the position at which the next read or write operation takes place.
You can re-use the MemoryStream by Setting the Position to 0 and the Length to 0. By setting the length to 0 you do not clear the existing buffer, it only resets the internal counters.
You need to first determine if virtual address fragmentation is the problem.
If you are on a 64 bit machine (which you seem to indicate you are) I seriously doubt it is. Each 64 bit process has almost the the entire 64 bit virtual memory space available and your only worry is virtual address space fragmentation not physical memory fragmentation (which is what the operating system must worry about). The OS memory manager already pages memory under the covers. For the forseeable future you will not run out of virtual address space before you run out of physical memory. This is unlikely change before we both retire.
If you are have a 32 bit address space, then allocating contiguous large blocks of memory in the GB ramge you will encounter a fragmentation problem quite quickly. There is no stock chunk allocating memory stream in the CLR. There is one in the under the covers in ASP.NET (for other reasons) but it is not accessable. If you must travel this path you are probably better off writing one youself anyway because the usage pattern of your application is unlikely to be similar to many others and trying to fit your data into a 32bit address space will likely be your perf bottleneck.
I highly recommend requiring a 64 bit process if you are manipulating GBs of data. It will do a much better job than hand-rolled solutions to 32 bit address space fragmentation regardless of how cleaver you are.
Something like this:
class ChunkedMemoryStream : Stream
{
private readonly List<byte[]> _chunks = new List<byte[]>();
private int _positionChunk;
private int _positionOffset;
private long _position;
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return true; }
}
public override bool CanWrite
{
get { return true; }
}
public override void Flush() { }
public override long Length
{
get { return _chunks.Sum(c => c.Length); }
}
public override long Position
{
get
{
return _position;
}
set
{
_position = value;
_positionChunk = 0;
while (_positionOffset != 0)
{
if (_positionChunk >= _chunks.Count)
throw new OverflowException();
if (_positionOffset < _chunks[_positionChunk].Length)
return;
_positionOffset -= _chunks[_positionChunk].Length;
_positionChunk++;
}
}
}
public override int Read(byte[] buffer, int offset, int count)
{
int result = 0;
while ((count != 0) && (_positionChunk != _chunks.Count))
{
int fromChunk = Math.Min(count, _chunks[_positionChunk].Length - _positionOffset);
if (fromChunk != 0)
{
Array.Copy(_chunks[_positionChunk], _positionOffset, buffer, offset, fromChunk);
offset += fromChunk;
count -= fromChunk;
result += fromChunk;
_position += fromChunk;
}
_positionOffset = 0;
_positionChunk++;
}
return result;
}
public override long Seek(long offset, SeekOrigin origin)
{
long newPos = 0;
switch (origin)
{
case SeekOrigin.Begin:
newPos = offset;
break;
case SeekOrigin.Current:
newPos = Position + offset;
break;
case SeekOrigin.End:
newPos = Length - offset;
break;
}
Position = Math.Max(0, Math.Min(newPos, Length));
return newPos;
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
while ((count != 0) && (_positionChunk != _chunks.Count))
{
int toChunk = Math.Min(count, _chunks[_positionChunk].Length - _positionOffset);
if (toChunk != 0)
{
Array.Copy(buffer, offset, _chunks[_positionChunk], _positionOffset, toChunk);
offset += toChunk;
count -= toChunk;
_position += toChunk;
}
_positionOffset = 0;
_positionChunk++;
}
if (count != 0)
{
byte[] chunk = new byte[count];
Array.Copy(buffer, offset, chunk, 0, count);
_chunks.Add(chunk);
_positionChunk = _chunks.Count;
_position += count;
}
}
}
class Program
{
static void Main(string[] args)
{
ChunkedMemoryStream cms = new ChunkedMemoryStream();
Debug.Assert(cms.Length == 0);
Debug.Assert(cms.Position == 0);
cms.Position = 0;
byte[] helloworld = Encoding.UTF8.GetBytes("hello world");
cms.Write(helloworld, 0, 3);
cms.Write(helloworld, 3, 3);
cms.Write(helloworld, 6, 5);
Debug.Assert(cms.Length == 11);
Debug.Assert(cms.Position == 11);
cms.Position = 0;
byte[] b = new byte[20];
cms.Read(b, 3, (int)cms.Length);
Debug.Assert(b.Skip(3).Take(11).SequenceEqual(helloworld));
cms.Position = 0;
cms.Write(Encoding.UTF8.GetBytes("seeya"), 0, 5);
Debug.Assert(cms.Length == 11);
Debug.Assert(cms.Position == 5);
cms.Position = 0;
cms.Read(b, 0, (byte) cms.Length);
Debug.Assert(b.Take(11).SequenceEqual(Encoding.UTF8.GetBytes("seeya world")));
Debug.Assert(cms.Length == 11);
Debug.Assert(cms.Position == 11);
cms.Write(Encoding.UTF8.GetBytes(" again"), 0, 6);
Debug.Assert(cms.Length == 17);
Debug.Assert(cms.Position == 17);
cms.Position = 0;
cms.Read(b, 0, (byte)cms.Length);
Debug.Assert(b.Take(17).SequenceEqual(Encoding.UTF8.GetBytes("seeya world again")));
}
}
The Bing team has released RecyclableMemoryStream and wrote about it here. The benefits they cite are:
I've found similar problem in my application. I've read large amount of compressed data and I suffered from OutOfMemoryException using MemoryStream. I've written my own implementation of "chunked" memory stream based on collection of byte arrays. If you have any idea how to make this memory stream more effective, please write me about it.
public sealed class ChunkedMemoryStream : Stream
{
#region Constants
private const int BUFFER_LENGTH = 65536;
private const byte ONE = 1;
private const byte ZERO = 0;
#endregion
#region Readonly & Static Fields
private readonly Collection<byte[]> _chunks;
#endregion
#region Fields
private long _length;
private long _position;
private const byte TWO = 2;
#endregion
#region C'tors
public ChunkedMemoryStream()
{
_chunks = new Collection<byte[]> { new byte[BUFFER_LENGTH], new byte[BUFFER_LENGTH] };
_position = ZERO;
_length = ZERO;
}
#endregion
#region Instance Properties
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return true; }
}
public override bool CanWrite
{
get { return true; }
}
public override long Length
{
get { return _length; }
}
public override long Position
{
get { return _position; }
set
{
if (!CanSeek)
throw new NotSupportedException();
_position = value;
if (_position > _length)
_position = _length - ONE;
}
}
private byte[] CurrentChunk
{
get
{
long positionDividedByBufferLength = _position / BUFFER_LENGTH;
var chunkIndex = Convert.ToInt32(positionDividedByBufferLength);
byte[] chunk = _chunks[chunkIndex];
return chunk;
}
}
private int PositionInChunk
{
get
{
int positionInChunk = Convert.ToInt32(_position % BUFFER_LENGTH);
return positionInChunk;
}
}
private int RemainingBytesInCurrentChunk
{
get
{
Contract.Ensures(Contract.Result<int>() > ZERO);
int remainingBytesInCurrentChunk = CurrentChunk.Length - PositionInChunk;
return remainingBytesInCurrentChunk;
}
}
#endregion
#region Instance Methods
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
if (offset + count > buffer.Length)
throw new ArgumentException();
if (buffer == null)
throw new ArgumentNullException();
if (offset < ZERO || count < ZERO)
throw new ArgumentOutOfRangeException();
if (!CanRead)
throw new NotSupportedException();
int bytesToRead = count;
if (_length - _position < bytesToRead)
bytesToRead = Convert.ToInt32(_length - _position);
int bytesreaded = 0;
while (bytesToRead > ZERO)
{
// get remaining bytes in current chunk
// read bytes in current chunk
// advance to next position
int remainingBytesInCurrentChunk = RemainingBytesInCurrentChunk;
if (remainingBytesInCurrentChunk > bytesToRead)
remainingBytesInCurrentChunk = bytesToRead;
Array.Copy(CurrentChunk, PositionInChunk, buffer, offset, remainingBytesInCurrentChunk);
//move position in source
_position += remainingBytesInCurrentChunk;
//move position in target
offset += remainingBytesInCurrentChunk;
//bytesToRead is smaller
bytesToRead -= remainingBytesInCurrentChunk;
//count readed bytes;
bytesreaded += remainingBytesInCurrentChunk;
}
return bytesreaded;
}
public override long Seek(long offset, SeekOrigin origin)
{
switch (origin)
{
case SeekOrigin.Begin:
Position = offset;
break;
case SeekOrigin.Current:
Position += offset;
break;
case SeekOrigin.End:
Position = Length + offset;
break;
}
return Position;
}
private long Capacity
{
get
{
int numberOfChunks = _chunks.Count;
long capacity = numberOfChunks * BUFFER_LENGTH;
return capacity;
}
}
public override void SetLength(long value)
{
if (value > _length)
{
while (value > Capacity)
{
var item = new byte[BUFFER_LENGTH];
_chunks.Add(item);
}
}
else if (value < _length)
{
var decimalValue = Convert.ToDecimal(value);
var valueToBeCompared = decimalValue % BUFFER_LENGTH == ZERO ? Capacity : Capacity - BUFFER_LENGTH;
//remove data chunks, but leave at least two chunks
while (value < valueToBeCompared && _chunks.Count > TWO)
{
byte[] lastChunk = _chunks.Last();
_chunks.Remove(lastChunk);
}
}
_length = value;
if (_position > _length - ONE)
_position = _length == 0 ? ZERO : _length - ONE;
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!CanWrite)
throw new NotSupportedException();
int bytesToWrite = count;
while (bytesToWrite > ZERO)
{
//get remaining space in current chunk
int remainingBytesInCurrentChunk = RemainingBytesInCurrentChunk;
//if count of bytes to be written is fewer than remaining
if (remainingBytesInCurrentChunk > bytesToWrite)
remainingBytesInCurrentChunk = bytesToWrite;
//if remaining bytes is still greater than zero
if (remainingBytesInCurrentChunk > ZERO)
{
//write remaining bytes to current Chunk
Array.Copy(buffer, offset, CurrentChunk, PositionInChunk, remainingBytesInCurrentChunk);
//change offset of source array
offset += remainingBytesInCurrentChunk;
//change bytes to write
bytesToWrite -= remainingBytesInCurrentChunk;
//change length and position
_length += remainingBytesInCurrentChunk;
_position += remainingBytesInCurrentChunk;
}
if (Capacity == _position)
_chunks.Add(new byte[BUFFER_LENGTH]);
}
}
/// <summary>
/// Gets entire content of stream regardless of Position value and return output as byte array
/// </summary>
/// <returns>byte array</returns>
public byte[] ToArray()
{
var outputArray = new byte[Length];
if (outputArray.Length != ZERO)
{
long outputPosition = ZERO;
foreach (byte[] chunk in _chunks)
{
var remainingLength = (Length - outputPosition) > chunk.Length
? chunk.Length
: Length - outputPosition;
Array.Copy(chunk, ZERO, outputArray, outputPosition, remainingLength);
outputPosition = outputPosition + remainingLength;
}
}
return outputArray;
}
/// <summary>
/// Method set Position to first element and write entire stream to another
/// </summary>
/// <param name="stream">Target stream</param>
public void WriteTo(Stream stream)
{
Contract.Requires(stream != null);
Position = ZERO;
var buffer = new byte[BUFFER_LENGTH];
int bytesReaded;
do
{
bytesReaded = Read(buffer, ZERO, BUFFER_LENGTH);
stream.Write(buffer, ZERO, bytesReaded);
} while (bytesReaded > ZERO);
}
#endregion
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With