I am using DDD with CQRS and Event Sourcing.
I need to use an Event Store (specifically this event store) within my custom implementation of IEventStore
to persist and retrieve domain events but I am having difficulties with the approach to take that deals with serialization/deserialization.
This is the interface I am implementing:
public interface IEventStore
{
Task<IEnumerable<IDomainEvent>> GetEventsAsync(Identity aggregateIdentity, Type aggregateType);
Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
Outside my implementation of IEventStore
I can have mappers from every IDomainEvent
into some serializable/deserializable EventDto or json string. That's not a problem.
But these are my restrictions:
my domain events are immutable objects that implement IDomainEvent
(i.e: no setters)
my domain events are not always easily serializable/deserializable in a generic way. Often they have abstract or interface properties, so the concrete mappers between my domain events and some serializable object such as string json or event DTO are decided outside my IEventStore
implementation.
My IEventStore
implementation needs to be generic in a way that if I add new domain event types, I should not need to touch anything within the IEventStore
implementation
My IEventStore
implementation can receive injected some specific implementations of IMapper<TSource, TDestination>
, so that I could use a them to serialize/deserialize between specific types (not interfaces).
public interface IMapper<in TSource, out TDestination>
{
TDestination Map(TSource source); // I have implementations of this if needed
}
This below is my attempt:
public class MyEventStore
: IEventStore
{
private readonly IStreamNameFactory _streamNameFactory;
private readonly IEventStoreConnection _eventStoreConnection; //this is the Greg Young's EventStore product that I want to use as database
private readonly IDomainEventFactory _domainEventFactory;
private readonly IEventDataFactory _eventDataFactory;
public EventStore(
IStreamNameFactory streamNameFactory,
IEventStoreConnection eventStoreConnection,
IDomainEventFactory domainEventFactory,
IEventDataFactory eventDataFactory)
{
_streamNameFactory = streamNameFactory;
_eventStoreConnection = eventStoreConnection;
_domainEventFactory = domainEventFactory;
_eventDataFactory = eventDataFactory;
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
Identity aggregateIdentity,
Type aggregateType)
{
var aggregateIdentityValue = aggregateIdentity.Value;
var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateType);
var streamEventSlice =
await _eventStoreConnection.ReadStreamEventsForwardAsync(streamName, 0, Int32.MaxValue, false);
var domainEvents = streamEventSlice
.Events
.Select(x => _domainEventFactory.Create(x));
return domainEvents;
}
[SuppressMessage("ReSharper", "PossibleMultipleEnumeration")]
public async Task PersistAsync(
IAggregateRoot aggregateRoot,
IEnumerable<IDomainEvent> domainEvents)
{
var numberOfEvents = domainEvents.Count();
var aggregateRootVersion = aggregateRoot.Version;
var originalVersion = aggregateRootVersion - numberOfEvents;
var expectedVersion = originalVersion - 1;
var aggregateIdentityValue = aggregateRoot.AggregateIdentity.Value;
var aggregateRootType = aggregateRoot.GetType();
var streamName = _streamNameFactory.Create(aggregateIdentityValue, aggregateRootType);
var assemblyQualifiedName = aggregateRootType.AssemblyQualifiedName;
var eventsToStore = domainEvents.Select(x => _eventDataFactory.Create(x, assemblyQualifiedName));
await _eventStoreConnection.AppendToStreamAsync(streamName, expectedVersion, eventsToStore);
}
}
The problems is mainly, as you can imagine, in the IDomainEventFactory
implementation. I need a class that implements the following interface:
public interface IDomainEventFactory
{
IDomainEvent Create(ResolvedEvent resolvedEvent);
}
This class needs to know which specific IDomainEvent
does it need to deserialize the resolvedEvent to at runtime. In other words, if the event being retrieved is a json representation of MyThingCreatedEvent
maybe I can use a service such as IMapper<ResolvedEvent, MyThingCreatedEvent>
. But if the event being retrieved is a json representation of MyThingUpdatedEvent
then I would need a service such as IMapper<ResolvedEvent, MyThingUpdatedEvent>
.
Some approaches came to my mind.
OPTION 1:
I thought I could have the IDomainEventFactory
implementation use the autofac IComponentContext
so that at runtime I could somehow manage to do some _componentContext.Resolve(theNeededType)
. But I don't know how to retrieve the IMapper that I need. Maybe this is something possible but I doubt it.
OPTION 2: Maybe I could have some mapping service such as IBetterMapper such as
public interface IBetterMapping
{
TDestination Map<TDestination>(object source) where TDestination : class;
}
so that my factory can delegate the concern of knowing how to deserialize anything into TDestination
. But I would have the same problem: I don't know how to create a type at runtime from a string, for example, to do something like _myBetterMapper.Map<WhichTypeHere>
and there is the additional problem of implementing that Map method, which I guess would require some registration table and based on the type choose one or another specific mapper.
I am really stuck with this. Hopefully I get some help from you guys! :)
UPDATE: I have implemented my own solution and uploaded the project here in my personal repo: https://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore The solution I went with is to keep the event store wrapper agnostic but to provide custom serializer/deserializer at DI registration for those events that are a bit "special". EventStore allows adding custom metadata headers, so I am using some custom headers to specify concrete implementation types on each data stream so that I know where to deserialize when retrieving the persisted events.
Serialization is the process of converting an object into a stream of bytes to store the object or transmit it to memory, a database, or a file. Its main purpose is to save the state of an object in order to be able to recreate it when needed. The reverse process is called deserialization.
A Domain Event is an event that is spawned from this model that is a result of a decision within the domain. Within the model our Aggregates have the role of maintaining business rules and this is where we implement decision points in our model, so the Aggregates are also responsible for creating the Domain Events.
How it Works. The fundamental idea of Event Sourcing is that of ensuring every change to the state of an application is captured in an event object, and that these event objects are themselves stored in the sequence they were applied for the same lifetime as the application state itself.
Integration events are generally used for integrating with other service boundaries. This then means we're moving work out of process by leveraging a message broker. Consumers will each individually process the event in isolation without having any effect on the publisher or any other consumers.
UPDATED ANSWER:
I have come to realize with time that the whole approach was a bad practice. I think a domain event should never have abstract (polymorphic) properties that could take different shape and therefore have issues when deserializing to know exactly what shape had the event been serialized into.
The problem is not technical (although for that, my answer below is still valid) but rather philosophical.
I strongly believe that domain events should only use basic types. Things that don't change (string, int, maybe some "safe" custom types such as money, etc.). It does no make much sense to have polymorphic domain events. If an event can take different shapes probably we are talking about different events.
It's important to take into consideration that a very old event (e.g: an event that was raised a year ago) also has to be deserialized when creating a projection (e.g: during a replay, or simply during the instantiation of an aggregate with event sourcing) and therefore this event should be deserialized properly without failing. Imagine the mess if for whatever reason somebody modified one of the classes that event was using and now the old info is not deserializable into the new class. We would be violating the most fundamental thing in event sourcing.
That's why I think we shouldn't use domain events with complex objects unless we are 100% sure these classes won't change, and we shouldn't use polymorphic domain events at all.
I have implemented a wrapper over EventStore .NET Client that implements my IEventStore
interface and abstracts my client application from anything behind the scenes.
public interface IEventStore
{
Task<IEnumerable<IDomainEvent>> GetEventsAsync(Guid aggregateId, Type aggregateType);
Task PersistAsync(IAggregateRoot aggregateRoot, IEnumerable<IDomainEvent> domainEvents);
}
The way I solved the main problem with serialization/deserialization is by providing custom serializer/deserializer for the domain events that are "special" (because they have abstract or interface properties that couldn't be deserialized unless knowing its specific concrete type). Also for each domain event persisted I save metadata headers stating which specific domain event type it is and which specific serializable event type it is.
In other words, the flow goes like this when persisting:
IDomainEvent -> convert to a serializable type (if needed) -> transform in bytes -> save stream data
and when retrieving
Stream Data -> transform to serializable type -> transform to IDomainEvent
I have uploaded the whole project in my personal repository in GitLab here: https://gitlab.com/iberodev/DiDrDe.EventStore.Infra.EventStore , feel free to have a look and run all the integration and unit tests with xUnit to understand it. And of course feel free to provide any feedback!
The heavy lifting on my solution is in the part of the client that needs to use an event store. It's the responsibility of its infrastructure layer (Autofac registration in its host application) to register EventStore using an Autofac extension and to provide the required custom serializers/deserializers if needed.
That way I can keep the implementation of the EventStore wrapper completely agnostic to specific settings and to specific domain events. It's a generic solution.
The README of the project clarifies this, but basically event store can be registered like this if the domain events are serializable (no abstract properties):
var builder = new ContainerBuilder(); // Autofac container
builder
.RegisterEventStore(
ctx =>
{
var eventStoreOptions =
new EventStoreOptions
{
ConnectionString = "ConnectTo=tcp://admin:[email protected]:1113; HeartBeatTimeout=500";
};
return eventStoreOptions;
});
var container = builder.Build();
and like this if there are domain events that are special because they have abstract properties:
var builder = new ContainerBuilder();
builder
.RegisterEventStore(
ctx =>
{
var eventStoreOptions =
new EventStoreOptions
{
ConnectionString = "ConnectTo=tcp://admin:[email protected]:1113; HeartBeatTimeout=500";
};
return eventStoreOptions;
},
ctx =>
{
var customDomainEventMappersOptions =
new CustomDomainEventMappersOptions()
.UsesCustomMappers<FakeDomainEventNotSerializable, FakeSerializableEvent>(
domainEvent =>
{
var mapper =
new FakeDomainEventNotSerializableToFakeSerializableEventMapper();
var result = mapper.Map(domainEvent);
return result;
},
serializableEvent =>
{
var mapper =
new FakeSerializableEventToFakeDomainEventNotSerializableMapper();
var result = mapper.Map(serializableEvent);
return result;
});
return customDomainEventMappersOptions;
});
var container = builder.Build();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With