Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NEventStore optimistic lock

I'm new to NEventStore and event sourcing in general. In a project I want to use NEventStore for persisting events generated by our aggregates, but I have some problem to correctly handle concurrency.

How can I write to the same stream using an optimistic lock?

Let's say I have 2 instances of the same aggregate that are loaded at revision 1 from 2 different threads. Then the first thread call command A and the second thread call command B . Using an optimistic lock one of the aggregate should fail with a concurrency exception.

I thought to use the maxRevision to open the stream from the point that the aggregate is loaded, but seems that the CommitChanges never fail, also if I pass an old revision.

What I'm missing? Is optimistic lock possible/correct when using NEventStore/Event Sourcing?

Here is the code that I have used to reproduce the problem:

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (var scope = new TransactionScope())
            using (store = WireupEventStore())
            {
                Client1(revision: 0);

                Client2(revision: 0);

                scope.Complete();
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .UsingInMemoryPersistence()
                .Build();
        }

        private static void Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

I expect client 2 to fail because I open the stream with an old revision.

UPDATE 26/08/2013: I have tested the same code using Sql server and seems to work as expected.

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (store = WireupEventStore())
            {
                OpenOrCreateStream();

                AppendToStream_Client1(revision: 1);

                AppendToStream_Client2(revision: 1); // throws an error
                // AppendToStream_Client2(revision: 2); // works
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .LogToOutputWindow()
                .UsingInMemoryPersistence()
                .UsingSqlPersistence("EventStore") // Connection string is in app.config
                    .WithDialect(new MsSqlDialect())
                    .InitializeStorageEngine()
                    .UsingJsonSerialization()
                .Build();
        }

        private static void OpenOrCreateStream()
        {
            using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
            {
                var @event = new SomeDomainEvent { Value = "Initial event." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 1." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 2." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

So back to my question: to enable optimistic lock should I use revision when opening the stream? There are other possible implementations or guidelines?

thanks

like image 977
Davide Icardi Avatar asked Jun 03 '26 21:06

Davide Icardi


1 Answers

Firstly, the in-memory persistence implementation, whose primary purpose is testing, is not transaction aware. In your original example, client 2 will simply append it's event to the stream. Try running the above with a persistence store that supports transactions (SQL & Raven, but not Mongo).

Secondly, specifying the min/max revision when opening a stream is used for different purposes:

  1. When re-hydrating an aggregate, and no snapshots are available, you would specify (min:0, max:int.MaxValue), as you are interested in retrieving all of the events.
  2. When re-hydrating an aggregate and a snapshot is available, you would specify (min:snapshot.Version, max:int.MaxValue) to get all events that have occurred since the snapshot.
  3. When saving an aggregate, you would specify (min:0, max:Aggregate.Version). The Aggregate.Version is derived during re-hydration. If same aggregate is re-hydrated at the same time somewhere else and saved, you'll have a race condition and a ConcurrencyException will occur.

Support for most of this would be encapsulated in a domain framework. See AggregateBase and EventStoreRepository in CommonDomain

Thirdly, and most importantly, updating >1 stream in a single transaction is a code smell. If you are doing DDD/ES, the stream represents a single aggregate root which, by definition, is a consistency boundary. Creating/updating more than one AR in a transaction breaks this. NEventStore's transaction support was (reluctantly) added so it could work with other tools, i.e. transactionally read a command from MSMQ/NServiceBus/whatever and handle it, or, transactionally dispatch a commit message to a queue and mark it as such. Personally, I'd would recommend that you do your best to avoid 2PC.


Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!