Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MassTransit - Wait for all activities to complete and then continue processing

If I have to much activities, does it cause blocking resources or request time out?

Here is my scenario:

I have an api controller which sends an Order request to consumer; I use Request/Response patern to recieve ErrorMessage property from consumer and base on that property response back, if it's null I would want to return OK() otherwise, return BadRequest or Ok but with a message like: Product out of stock to notify to the client.

In my consumer, I have build a routing slip which have 2 activities:

  • CreateOrderActivity: Which creates an order with order details.
  • ReserveProductActivity: Which reduces the quantity of product in stock, if product quantity < 0 I'll publish a message with an ErrorMessage back to the consumer and compensate the previous activity.

    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        try
        {
            if (!string.IsNullOrEmpty(context.Message.ErrorMessage))
            {
                await context.RespondAsync<OrderSubmitted>(new
                {
                    context.Message.OrderId,
                    context.Message.ErrorMessage
                });
    
                return;
            }
    
            RoutingSlipBuilder builder = new RoutingSlipBuilder(context.Message.OrderId);
            // get configs
            var settings = new Settings(_configuration);
    
            // Add activities
            builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
            builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });
    
            builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
            builder.SetVariables(new { context.Message.OrderDetails });
    
    
            await context.Execute(builder.Build());
    
            await context.RespondAsync<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
    
        }
        catch (Exception ex)
        {
            _log.LogError("Can not create Order {OrderId}", context.Message.OrderId);
            throw new Exception(ex.Message);
        }
    }
    

Code for ReserveProductActivity:

    public async Task<ExecutionResult> Execute(ExecuteContext<ReserveProductArguments> context)
    {
        var orderDetails = context.Arguments.OrderDetails;

        foreach (var orderDetail in orderDetails)
        {
            var product = await _productRepository.GetByProductId(orderDetail.ProductId);
            if (product == null) continue;

            var quantity = product.SetQuantity(product.QuantityInStock - orderDetail.Quantity);


            if (quantity < 0)
            {
                var errorMessage = "Out of stock.";
                await context.Publish<ProcessOrder>(new
                {
                    ErrorMessage = errorMessage
                });
                throw new RoutingSlipException(errorMessage);
            }

            await _productRepository.Update(product);
        }

        return context.Completed(new Log(orderDetails.Select(x => x.ProductId).ToList()));
    }

This line of code in a consumer method await context.Execute(builder.Build())

At first I thought it would build the routing slip and execute all activities first before going to the next line but it's not. Instead it's immediately going to the next line of code (which responses back to controller) and then after execute activities, which is not what I want. I need to check the quantity of product in 2nd activity first and base on that return back to the controller.

(In current, it always responses back to controller first - the line after buider.Buid(), and then if quantity < 0 it still goes to the very first if condition of the consume method but since it already responses, I cannot trigger response inside that if statement again).

So in short, if product is still available in 2nd activity I can send the reponse back like normal (which executes the code after context.Execute(builder.Build()), but if quantity < 0 - which I publish back to the consumer method with ErrorMessage, I would like it to jump to the very first if condition of Consume method (if(!string.IsNullOrEmpty(context.Message.ErrorMessage)) ...) and base on the ErrorMessage notify the client.

Is there something wrong with this approach? How can I achieve something like this?

Thanks

like image 853
mkzpizza Avatar asked Nov 06 '22 12:11

mkzpizza


1 Answers

It isn't documented, but it is possible to use a proxy to execute a routing slip, and response to the request with the result of the routing slip. You can see the details in the unit tests:

https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20

You could create the proxy, which builds the routing slip and executes it, and the response proxy - both of which are then configured on a receive endpoint as .Instance consumers.

  class RequestProxy :
        RoutingSlipRequestProxy<Request>
    {
        protected override void BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<Request> request)
        {
        // get configs
        var settings = new Settings(_configuration);

        // Add activities
        builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
        builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });

        builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
        builder.SetVariables(new { context.Message.OrderDetails });
        }
    }


    class ResponseProxy :
        RoutingSlipResponseProxy<Request, Response>
    {
        protected override Response CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, Request request)
        {
            return new Response();
        }
    }

You could then call it from the consumer, or put the ordering logic in the proxy - whichever makes sense, and then use the request client from your controller to send the request and await the response.

like image 55
Chris Patterson Avatar answered Nov 15 '22 05:11

Chris Patterson