Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Atomically change two numbers without using locks

I am creating an append only data structure that resides in memory and appends records serialized to byte arrays to that memory. I need this to be thread safe and extremely fast so I came up with the following code that is working great so far (this is a pseudo code the actual version is more complex and does some other stuff but just to get the idea)

public sealed class MemoryList : IDisposable
{
    private int nextOffset = 0;
    private readonly MemoryMappedFile file;
    private readonly MemoryMappedViewAccessor va;

    public MemoryList(uint capacity)
    {
        // Some checks on capacity here
        var mapName = Guid.NewGuid().ToString("N");
        this.file = MemoryMappedFile.CreateNew(mapName, capacity);
        this.va = file.CreateViewAccessor(0, capacity);
    }

    public void AppendMessage(byte[] messagePayload)
    {
        if (messagePayload == null) 
            throw new ArgumentNullException(nameof(messagePayload));
        if (messagePayload.Length == 0)
            throw new ArgumentOutOfRangeException(nameof(messagePayload));

        if (TryReserveCapacity(messagePayload.Length, out var offsetToWriteTo))
        {
            this.va.Write(offsetToWriteTo, messagePayload.Length);
            this.va.WriteArray(offsetToWriteTo + sizeof(int), messagePayload, 0, messagePayload.Length);
        }
    }

    private bool TryReserveCapacity(int dataLength, out long reservedOffset)
    {
        // reserve enough room to store data + its size
        var packetSize = sizeof(int) + dataLength;
        reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;

        if (this.nextOffset <= this.va.Capacity)
            return true;
        reservedOffset = -1;
        return false;
    }

    public void Dispose()
    {
        file?.Dispose();
        va?.Dispose();
    }
}

This is extremely fast and it works very well. I was not able to break it no matter how hard I tried.

So now what I need is for each appended message the TryReserveCapacity method to output the logical index of each message. So for the fist message get index 0, for the second - index 1 etc. This leads to using two calls to Interlocked one for the offset and one for the messageIndex that are apparently not thread safe and I can end up with race conditions leading to the following situation.

MI: 101, Offset: 10000 MI: 100, Offset: 10500

Any ideas on how to guarantee that no MI will be greater than another MI with a bigger offset? All that without using any locks?

So basically how do we change the following method to behave correctly?

private bool TryReserveCapacity(int dataLength, out long reservedOffset, out long messageId)
{
    // reserve enough room to store data + its size
    var packetSize = sizeof(int) + dataLength;
    reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
    messageId = Interlocked.Increment(ref this.currentMessageId);

    if (this.nextOffset <= this.va.Capacity)
        return true;
    reservedOffset = -1;
    return false;
}

P.S I am aware of endianness problems with the example code but as I said just consider it a pseudo code to illustrate the problem.

like image 391
Mihail Shishkov Avatar asked Nov 06 '22 16:11

Mihail Shishkov


1 Answers

Sorry if this is not directly addressing your primary concern (non-locking atomicity), but I see that you are manipulating memory-mapped files by using the MemoryMappedFile and MemoryMappedViewAccessor classes.

I really don't know if current iterations of the .NET Framework have addressed this, but in a codebase we wrote around three years ago, we found out that memory-mapped file manipulation using those classes offered really poor performance (around 7x slower if I recall correctly), compared with using the Win32 API and direct pointer manipulation of the mapped memory, even inside a managed C++/CLI class.

I strongly suggest you give this method a test, you may be surprised of the performance gains (as we certainly did), and maybe the performance gain is so significant that it allows you to afford the cost of standard locking to achieve the atomicity you desire.

If you like to explore this avenue, here is a code snippet that shows the basics of the technique.

Int32 StationHashStorage::Open() {
   msclr::lock lock(_syncRoot);
   if( _isOpen )
      return 0;
   String^ fileName = GetFullFileName();

   _szInBytes = ComputeFileSizeInBytes(fileName);
   String^ mapExtension = GetFileExtension();
   String^ mapName = String::Format("{0}{1}_{2}", _stationId, _date.ToString("yyyyMMdd"), mapExtension);

   marshal_context context;
   LPCTSTR pMapName = context.marshal_as<const TCHAR*>(mapName);

   {
      msclr::lock lock( _openLock );
         // Try to see if another storage instance has requested the same memory-mapped file and share it
         _hMapping = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE, FALSE, pMapName);
         if( !_hMapping ) {
            // This is the first instance acquiring the file
            LPCTSTR pFileName = context.marshal_as<const TCHAR*>(fileName);
            // Try to open the existing file, or create new one if not exists
            _hFile = CreateFile(pFileName, 
                                GENERIC_READ | GENERIC_WRITE, 
                                FILE_SHARE_READ,
                                NULL,
                                OPEN_ALWAYS,
                                FILE_ATTRIBUTE_NORMAL,
                                NULL);
            if( !_hFile )
               throw gcnew IOException(String::Format(Strings::CreateFileFailed, GetLastError(), _stationId));
            _hMapping = CreateFileMapping(_hFile, 
                                          NULL,
                                          PAGE_READWRITE | SEC_COMMIT,
                                          0,
                                          _szInBytes,
                                          pMapName);
            if( !_hMapping ) 
               throw gcnew IOException(String::Format(Strings::CreateMappingFailed, GetLastError(), _stationId));
            _usingSharedFile = false;
         } else {
            _usingSharedFile = true;
         }
      }

// _pData gives you access to the entire requested memory range, you can directly
// dereference it,  memcopy it, etc.

   _pData = (UInt32*)::MapViewOfFile(_hMapping, FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, 0);

   if( !_pData ) 
      throw gcnew IOException(String::Format(Strings::MapViewOfFileFailed, ::GetLastError(), _stationId));

   // warm-up the view by touching every page
   Int32 dummy = 0;
   for( int i = 0; i < _szInBytes / sizeof(Int32); i+= 1024 ) {
      dummy ^=  _pData[i];
   }
   // return the dummy value to prevent the optimizer from removing the apparently useless loop
   _isOpen = true;
   return dummy;
}

void StationHashStorage::Cleanup() {
     if( !_disposed ) {
      // dispose unmanaged resources here
      if( _pData ) {
         if( !UnmapViewOfFile(_pData) ) 
            LOG_ERROR(Strings::UnmapViewOfFileFailed, ::GetLastError(), _stationId);
         _pData = NULL;
      }

      if( _hMapping ) {
         if( !CloseHandle(_hMapping) ) 
            LOG_ERROR(Strings::CloseMappingFailed, ::GetLastError(), _stationId);
         _hMapping = NULL;
      }


      if( _hFile ) {
         if( !CloseHandle(_hFile) ) 
            LOG_ERROR(Strings::CloseFileFailed, ::GetLastError(), _stationId);
         _hFile = NULL;
      }
      _disposed = true;
   }
}

Now, regarding your real question. Is it posible that you embed the generated ID as part of the data stream? My idea would be something like this:

  1. Pre-write the whole contents of your memory with a dummy known value (maybe 0xffffffff).

  2. Use your current capacity check atomic logic.

  3. After writing a message payload, you immediately write the computed message Id (your capacity check would need to account for this extra data)

  4. Instead of using Interlocked.Add to get the next Id, you would enter a loop that checks the memory before the current message (the previous message Id) until it is different to your dummy known value. Once you exit the loop, the current message Id would be the read value + 1.

This would require some special manipulation of the first inserted message (as it needs to seed the first Id marker in your stream. You would also need to be careful (if you are using long Ids and you are in 32-bit mode) that your Id stream reads and writes are atomic.

Good luck, and I really encourage you to give the Win32 API a try, it would be very interesting to find out if things, hopefully, have improved! Feel free to contact me if you need assistance with the C++/CLI code.

like image 144
BlueStrat Avatar answered Nov 10 '22 00:11

BlueStrat