Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing a large file in .NET

The Problem

I need to be able to save and read a very big data structure using C#. The structure itself is rather simple; it's a very long array of a simple structs of a constant size.

Just an example for clarity:

struct st {
UInt32 a;
UInt16 b;
//etc.
} completion ports

st[] data = new st[1024*1024*100]

I want to be able to save and load these to files as fast and efficient as possible.

General Direction

My idea so far, is to cut the data into segments, conceptually of course, assign those segments to tasks and just write them into the file asynchronously. FileStream.WriteAsync appears to be perfect for this.

My problem is with the reading, From the FileStream.ReadAsync API it seems completely reasonable that the results can be cut in the middle of each structure, halfway across a primitive in fact. Of course I can work around this, but I'm not sure what would be the best way, and how much will I interfere with the OS's buffering mechanism.

Eventually I plan to create a MemoryStream from each buffer with MemoryStream.MemoryStream(byte[]) and read each into the struct's with a binary reader.

The Question

So what would be the best way to solve this? Is my direction good? Are there any better solutions? Code examples and links would be appreciated...

Conclusions

After doing performance testing I found that reading a file with BinaryReader, or using multiple readers with FileStream.ReadAsync, gives approximately the same performance.

Soo.... the question is pointless.

like image 418
AK_ Avatar asked Mar 25 '23 04:03

AK_


2 Answers

Your biggest bottleneck is going to be IO, which has to be performed with exclusive access to the file. The actual byte-crunching for this will be fast - you are going to do just as well writing it directly to the file (noting that the FileStream itself has a buffer, or you can add an extra layer with BufferedStream) than you would by serializing different parts in-memory and then copying each in-memory part to the stream separately.

My advice: just write the data in a single thread. Frankly I'm not sure I'd even bother with async (hint: async code adds overhead), especially if the buffer is keeping up. I also wouldn't use BiaryWriter / BinaryReader - I'd just write it raw. One tricky you could do is to use some unsafe code to copy the data in blocks, to avoid having to even look at individual objects, but that is at the harder end of things... I'll try to do an example.

Here's an example of read/write, noting performance first:

Write: 2012ms
Read: 1089ms
File: 838,860,804 bytes

Code:

[DllImport("msvcrt.dll", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern IntPtr memcpy(IntPtr dest, IntPtr src, UIntPtr count);

unsafe static st[] Read(string path)
{
    using (var file = File.OpenRead(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (byte* bufferPtr = buffer)
        {
            Fill(file, buffer, 0, 4);
            int len = ((int*)bufferPtr)[0];

            st[] result = new st[len];
            fixed (st* dataPtr = result)
            {
                st* rawPtr = dataPtr;
                IntPtr source= new IntPtr(bufferPtr);
                while (len >= BLOCK_SIZE)
                {
                    Fill(file, buffer, 0, buffer.Length);
                    memcpy(new IntPtr(rawPtr), source, bufferLen);
                    len -= BLOCK_SIZE;
                    rawPtr += BLOCK_SIZE;
                }
                if (len > 0)
                {
                    Fill(file, buffer, 0, len * size);
                    memcpy(new IntPtr(rawPtr), source, new UIntPtr((uint)(len * size)));
                }
            }
            return result;
        }
    }


}
static void Fill(Stream source, byte[] buffer, int offset, int count)
{
    int read;
    while (count > 0 && (read = source.Read(buffer, offset, count)) > 0)
    {
        offset += read;
        count -= read;
    }
    if (count > 0) throw new EndOfStreamException();
}

unsafe static void Write(st[] data, string path)
{
    using (var file = File.Create(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        int len = data.Length;
        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (st* dataPtr = data)
        fixed (byte* bufferPtr = buffer)
        {
            // write the number of elements
            ((int*)bufferPtr)[0] = data.Length;
            file.Write(buffer, 0, 4);

            st* rawPtr = dataPtr;
            IntPtr destination = new IntPtr(bufferPtr);
            // write complete blocks of BLOCK_SIZE
            while (len >= BLOCK_SIZE)
            {
                memcpy(destination, new IntPtr(rawPtr), bufferLen);
                len -= BLOCK_SIZE;
                rawPtr += BLOCK_SIZE;
                file.Write(buffer, 0, buffer.Length);
            }
            if (len > 0)
            {   // write an incomplete block, if necessary
                memcpy(destination, new IntPtr(rawPtr), new UIntPtr((uint)(len * size)));
                file.Write(buffer, 0, len * size);
            }
        }
    }
}
like image 197
Marc Gravell Avatar answered Apr 01 '23 11:04

Marc Gravell


[EDIT] I have updated this post to include a complete compilable sample, and also to address the issues raised by @Daniel in his comments below. As a result, this code no longer uses any "dangerous" methods and has no Code Analysis warnings. [/EDIT]

There is a way you can speed things up a little if your structs contain ONLY blittable types.

You can use marshaling to read the data directly into an array without making additional copies, like so (complete compilable example):

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace ConsoleApplication1
{
    internal class Program
    {
        struct TestStruct // Mutable for brevity; real structs should be immutable.
        {
            public byte   ByteValue;
            public short  ShortValue;
            public int    IntValue;
            public long   LongValue;
            public float  FloatValue;
            public double DoubleValue;
        }

        static void Main()
        {
            var array = new TestStruct[10];

            for (byte i = 0; i < array.Length; ++i)
            {
                array[i].ByteValue   = i;
                array[i].ShortValue  = i;
                array[i].IntValue    = i;
                array[i].LongValue   = i;
                array[i].FloatValue  = i;
                array[i].DoubleValue = i;
            }

            Directory.CreateDirectory("C:\\TEST");

            using (var output = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Create))
                FastWrite(output, array, 0, array.Length);

            using (var input = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Open))
                array = FastRead<TestStruct>(input, array.Length);

            for (byte i = 0; i < array.Length; ++i)
            {
                Trace.Assert(array[i].ByteValue   == i);
                Trace.Assert(array[i].ShortValue  == i);
                Trace.Assert(array[i].IntValue    == i);
                Trace.Assert(array[i].LongValue   == i);
                Trace.Assert(array[i].FloatValue  == i);
                Trace.Assert(array[i].DoubleValue == i);
            }
        }

        /// <summary>
        /// Writes a part of an array to a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream to which to write.</param>
        /// <param name="array">The array containing the data to write.</param>
        /// <param name="offset">The offset of the start of the data in the array to write.</param>
        /// <param name="count">The number of array elements to write.</param>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static void FastWrite<T>(FileStream fs, T[] array, int offset, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));
            GCHandle gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);

            try
            {
                uint bytesWritten;
                uint bytesToWrite = (uint)(count * sizeOfT);

                if
                (
                    !WriteFile
                    (
                        fs.SafeFileHandle,
                        new IntPtr(gcHandle.AddrOfPinnedObject().ToInt64() + (offset*sizeOfT)),
                        bytesToWrite,
                        out bytesWritten,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to write file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesWritten == bytesToWrite);
            }

            finally
            {
                gcHandle.Free();
            }
        }

        /// <summary>
        /// Reads array data from a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream from which to read.</param>
        /// <param name="count">The number of elements to read.</param>
        /// <returns>
        /// The array of elements that was read. This may be less than the number that was
        /// requested if the end of the file was reached. It may even be empty.
        /// NOTE: There may still be data left in the file, even if not all the requested
        /// elements were returned - this happens if the number of bytes remaining in the
        /// file is less than the size of the array elements.
        /// </returns>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static T[] FastRead<T>(FileStream fs, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));

            long bytesRemaining  = fs.Length - fs.Position;
            long wantedBytes     = count * sizeOfT;
            long bytesAvailable  = Math.Min(bytesRemaining, wantedBytes);
            long availableValues = bytesAvailable / sizeOfT;
            long bytesToRead     = (availableValues * sizeOfT);

            if ((bytesRemaining < wantedBytes) && ((bytesRemaining - bytesToRead) > 0))
            {
                Debug.WriteLine("Requested data exceeds available data and partial data remains in the file.", "Dmr.Common.IO.Arrays.FastRead(fs,count)");
            }

            T[] result = new T[availableValues];

            if (availableValues == 0)
                return result;

            GCHandle gcHandle = GCHandle.Alloc(result, GCHandleType.Pinned);

            try
            {
                uint bytesRead;

                if
                (
                    !ReadFile
                    (
                        fs.SafeFileHandle,
                        gcHandle.AddrOfPinnedObject(),
                        (uint)bytesToRead,
                        out bytesRead,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to read file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesRead == bytesToRead);
            }

            finally
            {
                gcHandle.Free();
            }

            return result;
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool WriteFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToWrite,
            out uint             lpNumberOfBytesWritten,
            IntPtr               lpOverlapped
        );

        /// <summary>See the Windows API documentation for details.</summary>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool ReadFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToRead,
            out uint             lpNumberOfBytesRead,
            IntPtr               lpOverlapped
        );
    }
}

Then you could create a BlockingCollection to store the incoming data and use one thread to populate it and a separate thread to consume it.

The thread that read data into the queue could look like this:

public void ReadIntoQueue<T>(FileStream fs, BlockingCollection<T[]> queue, int blockSize) where T: struct
{
    while (true)
    {
        var data = FastRead<T>(fs, blockSize);

        if (data.Length == 0)
        {
            queue.CompleteAdding();
            break;
        }

        queue.Add(data);
    }
}

And the consuming thread would remove stuff from the queue like so:

public void ProcessDataFromQueue<T>(BlockingCollection<T[]> queue) where T : struct
{
    foreach (var array in queue.GetConsumingEnumerable())
    {
        // Do something with 'array'
    }
}
like image 25
Matthew Watson Avatar answered Apr 01 '23 11:04

Matthew Watson