Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a saga using a scatter/Gather pattern In MassTransit 3.0

Jimmy Boagard describes a McDonalds fast food chain here comparing it to a scatter gather pattern.

Workflow image stolen from above article: enter image description here

Initial Implementation Thoughts:

To have a common interface for all of the types of FoodOrdered events that all of the food stations would get and then each food station would be able to consume/create its respective item and publish a common done event. Ex: fries and burger station gets a message regarding an order of Fries, The fries station consumes the order announces an ItemDoneEvent that the saga is listening for.

Initial Concerns:

Since the Saga doesn't care about the type of food completed just the fact that all the food is completed this would seem to be an OK solution. However after reading warnings here regarding sharing of queues and noticing that Consumer.Conditional filtering has been removed with MassTransit 3.0 It feels as though the framework is saying "Bad Things(TM) will happen" with this type of approach. But I'm not sure how else you would do it with out creating a message request and response and correlating Event for each food item in the kitchen. Ex: FriesOrdered, BurgerOrdered FriesCooked, BurgerCooked. This would be very tedious if you had to do that for every item in the kitchen?

Given the above concerns - what would a good saga example for this type of workflow look like?

like image 365
Ashtonian Avatar asked Nov 07 '15 05:11

Ashtonian


1 Answers

I came into similar problem - need to publish few dozends of commands (all same interface, IMyRequest) and wait all.

Actually my command initiates other saga, which publish IMyRequestDone at the end of processing without marking saga completed. (Need to complete them at some time later.) So instead of saving number of completed nested sagas in parent saga I just query state of child saga instances.

Check on every MyRequestDone message:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x => {     // timeout for all requests     x.Delay = TimeSpan.FromMinutes(10);     x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); });   During(Active,     When(Xxx)         .ThenAsync(async context =>         {             await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));             await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));              context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;             context.Instance.WaitingMyResponsesCount = 2;         })         .TransitionTo(WaitingMyResponses)         .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))     );  During(WaitingMyResponses,     When(MyRequestDone)         .Then(context =>         {             if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)                 throw new TimeoutException();         })         .If(context =>         {             var db = serviceProvider.GetRequiredService<DbContext>();             var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();             var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&                 requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed             return allDone;         }, x => x             .Unschedule(FailSagaOnRequestsTimeout)             .TransitionTo(Active))         )         .Catch<TimeoutException>(x => x.TransitionTo(Failed)) );  During(WaitingMyResponses,     When(FailSagaOnRequestsTimeout.Received)         .TransitionTo(Failed) 

Periodically check that all requests done (by "Reducing NServiceBus Saga load"):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x => {     // check interval     x.Delay = TimeSpan.FromSeconds(15);     x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); });  During(Active,     When(Xxx)         .ThenAsync(async context =>         {             await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));             await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));              context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);             context.Instance.WaitingMyResponsesCount = 2;         })         .TransitionTo(WaitingMyResponses)         .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))     );  During(WaitingMyResponses,     When(CheckAllRequestsDone.Recieved)         .Then(context =>         {             var db = serviceProvider.GetRequiredService<DbContext>();             var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();             var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&                 requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));             if (!allDone)                        {                 if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)                                   throw new TimeoutException();                 throw new NotAllDoneException();             }         })         .TransitionTo(Active)         .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))         .Catch<TimeoutException>(x => x.TransitionTo(Failed)); 
like image 123
smg Avatar answered Sep 24 '22 02:09

smg