Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write a MassTransit Json Deserializer for Azure Services

here it is how I am publishing to an object to event grid. I want to be able to use the azure service bus to listen to it.

        public void Publicar<T>(T model, string operation, string entity)
    {
        _nomeEvento = entity + operation;

        Boolean.TryParse(Configuration["EventGridConfig:Enabled"], out var eventGridIsActive);
        if (!eventGridIsActive)
            return;

        var primaryTopicKey = Configuration["EventGridConfig:AcessKey"];
        var primaryTopic = Configuration["EventGridConfig:Endpoint"];

        var primaryTopicHostname = new Uri(primaryTopic).Host;

        var topicCredentials = new TopicCredentials(primaryTopicKey);
        var client = new EventGridClient(topicCredentials);

        client.PublishEventsAsync(primaryTopicHostname, GetEventsList(model)).GetAwaiter().GetResult();
    }

    private List<EventGridEvent> GetEventsList<T>(T model)
    {
        return new List<EventGridEvent>
        {
            new EventGridEvent()
            {
                Id = Guid.NewGuid().ToString(),
                EventType = _nomeEvento,
                Data = model,
                EventTime = DateTime.Now,
                Subject = "MS_Clientes",
                DataVersion = "1.0",
            }
        };
    }

here it is how i am connection to the service bus

    static class CustomExtensionsMethods
{
    public static IServiceCollection AddBus(this IServiceCollection services, IConfiguration configuration,
        IHostingEnvironment env)
    {
        services.AddMassTransit(x => { x.AddConsumer<NomeEmailChangeConsumer>(); });
        services.AddSingleton(provider => Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var keyName = "RootManageSharedAccessKey";
            var busName = configuration["ServiceBus:Name"];
            var secret = configuration["ServiceBus:Secret"];
            var host = cfg.Host(
                "Endpoint=sb://" + busName + ".servicebus.windows.net/;" +
                "SharedAccessKeyName=" + keyName + ";" +
                "SharedAccessKey=" + secret,
                z =>
                {
                    TokenProvider
                        .CreateSharedAccessSignatureTokenProvider(keyName, secret);
                });
            cfg.ConfigureJsonSerializer(settings =>
            {
                settings.Converters.Add(new InterfaceConverter());

                return settings;
            });
            cfg.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
            cfg.ReceiveEndpoint(host, configuration["ServiceBus:Topic"],
                e => { e.Consumer<NomeEmailChangeConsumer>(provider); });
        }));
        services.AddSingleton<IPublishEndpoint>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<ISendEndpointProvider>(provider => provider.GetRequiredService<IBusControl>());
        services.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>());
        services.AddScoped(provider => provider.GetRequiredService<IBus>().CreateRequestClient<NomeEmailChange>());
        services.AddSingleton<IHostedService, BusService>();
        return services;
    }
}

but then I get the same error

    fail: MassTransit.Messages[0]
      R-FAULT sb://dev.servicebus.windows.net/bff-queue 9ade19ec-238c-4c08-8e03-28bac695ea7b No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
System.Runtime.Serialization.SerializationException: No deserializer was registered for the message content type: application/json; charset=utf-8. Supported content types include application/vnd.masstransit+json, application/vnd.masstransit+bson, application/vnd.masstransit+xml
   at MassTransit.Serialization.SupportedMessageDeserializers.Deserialize(ReceiveContext receiveContext)
   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

I have tried to add a JsonConverter I found online, but no luck

    public class InterfaceConverter : JsonConverter
    {
        public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
        {
            serializer.Serialize(writer, value);
        }

        public override object ReadJson(JsonReader reader, Type objectType, object existingValue,
            JsonSerializer serializer)
        {
            // Set TypeNameHandling to Auto for deserializing objects with $type
            // Should be set directly in ConfigureJsonDeserializer when setting up MT Service bus
            serializer.TypeNameHandling = TypeNameHandling.Auto;
            return serializer.Deserialize(reader);
        }

        public override bool CanConvert(Type objectType)
        {
            return objectType.IsInterface;
        }
    }
like image 267
CESCO Avatar asked Nov 29 '19 15:11

CESCO


2 Answers

I tried a couple of tests and came up with a working solution. In my test case, I am redirecting messages from the EventGridTopic to the ServiceBusQueue, just like in your case - if I understood well.

Since MassTransit requires messages to be in a certain format in order to interpret them, we need to make sure to have the following:

  1. Custom deserializer for Event grid messages which are the type of EventGridEvent
  2. Make sure that all the messages that have to be consumed by MassTransit have ContentType - without this, it won't work

Therefore I built an example that can work if you are redirecting messages from EventGrid, but also if you pipe messages directly to the Service Bus. The following code is an example of how to implement deserializer for EventGrid messages:

public class EventGridMessgeDeserializer : IMessageDeserializer
    {
        private string _contentType;

        public EventGridMessgeDeserializer(string contentType)
        {
            _contentType = contentType;
        }
        public ContentType ContentType => new ContentType(_contentType);

        public ConsumeContext Deserialize(ReceiveContext receiveContext)
        {
            var body = Encoding.UTF8.GetString(receiveContext.GetBody());
            var customMessage = JsonConvert.DeserializeObject<EventGridEvent>(body);
            var serviceBusSendContext = new AzureServiceBusSendContext<EventGridEvent>(customMessage, CancellationToken.None);

            // this is the default scheme, that has to match in order messages to be processed
            // EventGrid messages type of EventGridEvent within namespace Microsoft.Azure.EventGrid.Models
            string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" };
            var serviceBusContext = receiveContext as ServiceBusReceiveContext;
            serviceBusSendContext.ContentType = new ContentType(JsonMessageSerializer.JsonContentType.ToString());
            serviceBusSendContext.SourceAddress = serviceBusContext.InputAddress;
            serviceBusSendContext.SessionId = serviceBusContext.SessionId;

            // sending JToken because we are using default Newtonsoft deserializer/serializer
            var messageEnv = new JsonMessageEnvelope(serviceBusSendContext, JObject.Parse(body), messageTypes);
            return new JsonConsumeContext(JsonSerializer.CreateDefault(), receiveContext, messageEnv);
        }

        public void Probe(ProbeContext context)
        {
        }
    }

The important part here is that you specify in your custom deserializer what is the message type(s). Since MassTransit requires some format and ignores messages that do not comply, this is the place where we specify that piece of information as MassTransit requires.

string[] messageTypes = { "urn:message:Microsoft.Azure.EventGrid.Models:EventGridEvent" }

This is the default scheme, that has to match in order messages to be processed

And finally, full code you can find on Github: https://github.com/kgalic/MassTransitSample

Side note: If you are sending messages directly to the SB queue and want to deserialize them, as previously said, you need to specify the ContentType as it follows:

var message = new Message(UTF8Encoding.UTF8.GetBytes(request));
message.ContentType = "application/json"; //must have
await _senderClient.SendAsync(message);

In case you have something like this, you need to write the deserializer similar as for EventGridEvent, which you can use as an example.

like image 74
kgalic Avatar answered Dec 21 '22 22:12

kgalic


MassTransit encapsulates messages in a message envelope as documented and since the Event Grid messages aren't in that format, the error.

The JsonSerializer settings that you've set are used but the expected object on deserialize is a MessageEnvelope at this line.

I guess you have 2 ways to work around this

  1. Create and use a custom deserializer (similar to JsonMessageDeserializer) which deserializes into a simple JObject or Message

    UPDATE: On trying this further, its seems a lot more complex that I had originally thought and like Richard mentions in the other answer, you are probably better off using the Azure Service Bus client itself (or even a generic AMQP client if required)

  2. Trigger an Azure Function or Logic App to wrap the Event Grid payload into a Message Envelope for MassTransit to be able to deserialize it and then send it into the service bus queue

like image 39
PramodValavala-MSFT Avatar answered Dec 22 '22 00:12

PramodValavala-MSFT