Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to register a generic consumer adapter in MassTransit if I have a list of message types

I am successfully using MassTransit for a silly sample application where I publish an message (an event) from a Publisher console application and I receive it at two different consumers which are also console applications using RabbitMq.

This is the whole sample project git repo: https://gitlab.com/DiegoDrivenDesign/DiDrDe.MessageBus

I want to have a project wrapping MassTransit functionality so that my Publisher and Consumers projects know nothing about MassTransit. The dependencies should go in this direction:

  • DiDrDe.MessageBus ==> MassTransit
  • DiDrDe.MessageBus ==> DiDrDe.Contracts
  • DiDrDe.Model ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.MessageBus
  • DiDrDe.Publisher ==> DiDrDe.Contracts
  • DiDrDe.Publisher ==> DiDrDe.Model
  • DiDrDe.ConsumerOne ==> DiDrDe.Contracts
  • DiDrDe.ConsumerOne ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerOne ==> DiDrDe.Model
  • DiDrDe.ConsumerTwo ==> DiDrDe.Contracts
  • DiDrDe.ConsumerTwo ==> DiDrDe.MessageBus
  • DiDrDe.ConsumerTwo ==> DiDrDe.Model

Notice how DiDrDe.MessageBus does not know anything about DiDrDe.Model because it's a generic project that should be valid for any message type.

To achieve this I am implementing the adapter pattern so that my custom interfaces IEventDtoBus (to publish events) and IEventDtoHandler<TEventDto> (to consume events) are all my Publisher and Consumers know. The MassTransit wrapper project (called DiDrDe.MessageBus) implements the adapters with an EventDtoBusAdapter composed of an IEventDtoBus and a EventDtoHandlerAdapter<TEventDto> as my only generic IConsumer<TEventDto> composed of an IEventDtoHandler<TEventDto>

The problem I have is with the way MassTransit requires the consumers to be registered because my consumer is a generic one and its type should not be known by the MassTransit wrapper at compile time.

I need to find a way to register the EventDtoHandlerAdapter<TEventDto> as a consumer for each TEventDto type I pass at runtime (as a collection of types for example). Please see my repository for all the details.

MassTransit supports an overload method that accepts a type (good! just what I want) but it also requires a second argument Func<type, object> consumerFactory and I don't know how to implement it.

UPDATE 1: The problem is that I cannot register this generic consumer like:

consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>();

because I get a compilation error

Severity Code Description Project File Line Suppression State Error CS0310 'EventDtoHandlerAdapter' must be a non-abstract type with a public parameterless constructor in order to use it as parameter 'TConsumer' in the generic type or method 'ConsumerExtensions.Consumer(IReceiveEndpointConfigurator, Action>)' DiDrDe.MessageBus C:\src\DiDrDe.MessageBus\DiDrDe.MessageBus\IoCC\Autofac\RegistrationExtensions.cs

UPDATE 2: I have tried several things and I have updated the project on my repo. These are my attempts in the MassTransit wrapper project. Notice how I am able to have everything working if I add a dependency to each message (event) I want to handle. But I don't want that.. I don't want this project to know ANYTHING about the messages it can handle. If only I could register consumers knowing only a message type..

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS WORKS
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));

    // DOES NOT WORK
    //var typeEventDtoHandler = typeof(IEventDtoHandler<>).MakeGenericType(typeof(ThingHappened));
    //var eventDtoHandler = context.Resolve(typeEventDtoHandler);
    //consumer.Consumer(eventDtoHandler);

    // DOES NOT WORK
    //consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);

    // DOES NOT WORK
    //var consumerGenericType = typeof(IConsumer<>);
    //var consumerThingHappenedType = consumerGenericType.MakeGenericType(typeof(ThingHappened));
    //consumer.Consumer(consumerThingHappenedType,  null);
});

UPDATE 3: Following Igor's advice, I try to do the following:

var adapterType = typeof(EventDtoHandlerAdapter<>).MakeGenericType(typeof(ThingHappened));
                            consumer.Consumer(adapterType, (Type x) => context.Resolve(x));

but I get a runtime error saying that

The requested service 'DiDrDe.MessageBus.EventDtoHandlerAdapter`1[[DiDrDe.Model.ThingHappened, DiDrDe.Model, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null]]' has not been registered. To avoid this exception, either register a component to provide the service, check for service registration using IsRegistered(), or use the ResolveOptional() method to resolve an optional dependency.

I have even tried to register EventDtoHandlerAdapter<> separately as a IConsumer in case that was the issue but no luck.

builder
    .RegisterGeneric(typeof(EventDtoHandlerAdapter<>))
    .As(typeof(IConsumer<>))
    .SingleInstance();

Also with:

builder
  .RegisterType<EventDtoHandlerAdapter<ThingHappened>>()
  .AsSelf();

and it tells me that

System.ObjectDisposedException: 'This resolve operation has already ended. When registering components using lambdas, the IComponentContext 'c' parameter to the lambda cannot be stored. Instead, either resolve IComponentContext again from 'c', or resolve a Func<> based factory to create subsequent components from

Just to clarify, The only consumer I need to register is my EventDtoHandlerAdapter<TEventDto>. It is generic, so essentially it will exist a registration per TEventDto that I support. The thing is that I don't need the types in advance so I need to operate with types.

UPDATE 4: A new attempt as suggested by Igor. This time with the "proxy". I have updated my repo with the last attempt for all details. I have my consumer and marker interface:

public interface IEventDtoHandler
{
}

public interface IEventDtoHandler<in TEventDto>
    : IEventDtoHandler
        where TEventDto : IEventDto
{
    Task HandleAsync(TEventDto eventDto);
}

and I have my own implementation of a consumer that knows nothing about MassTransit:

public class ThingHappenedHandler
    : IEventDtoHandler<ThingHappened>
{
    public Task HandleAsync(ThingHappened eventDto)
    {
        Console.WriteLine($"Received {eventDto.Name} " +
                            $"{eventDto.Description} at consumer one that uses an IEventDtoHandler");
        return Task.CompletedTask;
    }
}

Now my "wrapper consumer" is what I call the adapter because it knows about MassTransit (it implements MassTransit IConsumer).

public class EventDtoHandlerAdapter<TConsumer, TEventDto>
    : IConsumer<TEventDto>
        where TConsumer : IEventDtoHandler<TEventDto>
        where TEventDto : class, IEventDto
{
    private readonly TConsumer _consumer;

    public EventDtoHandlerAdapter(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public async Task Consume(ConsumeContext<TEventDto> context)
    {
        await _consumer.HandleAsync(context.Message);
    }
}

Now the last step is to register my "wrapper consumer" with MassTransit. But since it's generic I don't know how to do it. This is the issue.

I can have all my consumer types scanned and registered in Autofac as suggested with:

var interfaceType = typeof(IEventDtoHandler);
var consumerTypes =
    AppDomain.CurrentDomain.GetAssemblies()
        .SelectMany(x => x.GetTypes())
        .Where(x => interfaceType.IsAssignableFrom(x)
                    && !x.IsInterface
                    && !x.IsAbstract)
        .ToList();

So now I have all the consumer types (all the implementations of IEventDtoHandler, including my ThingHappenedHandler). What now? How to register it?

Something like the following does not work:

foreach (var consumerType in consumerTypes)
{
    consumer.Consumer(consumerType, (Type x) => context.Resolve(x));
}

But I guess it's normal that it doesn't work because what I want to register is my EventDtoHandlerAdapter, which is the real IConsumer.

So, I think I didn't understand your suggestion. Sorry!

What I need is something like this:

//THIS WORKS
var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
consumer.Consumer(() => new EventDtoHandlerAdapter<IEventDtoHandler<ThingHappened>, ThingHappened>(eventDtoHandler));

But without using the ThingHappened model because the model should not be known. Here is where I remain stuck

UPDATE 5: New attempt as suggested by Chris Patterson (his solution was merged into master on my repo), but the problem remains.

To clarify, DiDrDe.MessageBus must be agnostic of any publisher, consumer and model. It should only depend on MassTransit and DiDrDe.Contracts, and Chris' solution has a line like:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    consumer.Consumer<EventDtoHandlerAdapter<ThingHappened>>(context);
});

That has a direct dependency on ThingHappened model. This is not allowed and it's actually no much different from the solution I already had which was:

cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
{
    //THIS works, but it uses ThingHappened explicitly and I don't want that dependency
    var eventDtoHandler = context.Resolve<IEventDtoHandler<ThingHappened>>();
    consumer.Consumer(() => new EventDtoHandlerAdapter<ThingHappened>(eventDtoHandler));
});

Sorry if this was not clear enough, but DiDrDe.MessageBus will eventually be a nuGet package shared between many different consumer and publisher projects and it should not have any dependency to any specific message/model.

UPDATE 6: The question has been resolved. Thanks a lot to Igor and Chris for their time and help. I have pushed the solution to master on my repo.

PS: Unfortunately when I have two handlers in the same consumer handling the same Event, this solution has its limitations because it seems only one of the handlers is being executed (twice). I would expect both handlers to be executed or only one, but only once (not twice). But this is another subject already :)

like image 500
diegosasw Avatar asked Oct 14 '18 17:10

diegosasw


People also ask

How do I configure a MassTransit consumer?

There are many ways to configure MassTransit consumers and register them. A nice way I've found is to create an extension class and setup MassTransit configuration. In this post I'll show how to register a consumer using the AddConsumer method and have a separate class for the consumer definition so I can configure it individually.

What is MassTransit’s policy on namespaces?

MassTransit uses the full type name, including the namespace, for message contracts. When creating the same message type in two separate projects, the namespaces must match or the message will not be consumed. An example message to update a customer address is shown below.

What are the different types of mass transit consumers?

MassTransit includes many consumer types, including consumers, sagas, saga state machines, routing slip activities, handlers, and job consumers. A consumer, which is the most common consumer type, is a class that consumes one or more messages types.

Why make the commandconsumerdefinition generic in MassTransit?

Making the CommandConsumerDefinition generic allowed me to use reflection to construct both types in the same way at runtime. This allowed MassTransit to wire up the configuration in the expected manner.


1 Answers

Take a look at custom consumer convention here: https://github.com/MassTransit/MassTransit/tree/develop/src/MassTransit.Tests/Conventional

Create your own IMyConsumerInterface, IMyMessageInterface plug it in the code from that test. Register it ConsumerConvention.Register<CustomConsumerConvention>(); before creating the bus. It should work.

Additionally you can create your own wrapper around the consumer context and have that passed together with the message.

LoadFrom (MassTransit.AutofacIntegration) didn't work for me with the custom convention so I had to manually register consumers

foreach (var consumer in consumerTypes)
  cfg.Consumer(consumer, (Type x) => _container.Resolve(x));

Alternatively if you want to use "proxy" approach do something like:

public class WrapperConsumer<TConsumer, TMessage> : IConsumer<TMessage>
    where TMessage : class, IMyMessageInterface 
    where TConsumer : IMyConsumerInterface<TMessage>
{
    private readonly TConsumer _consumer;

    public WrapperConsumer(TConsumer consumer)
    {
        _consumer = consumer;
    }

    public Task Consume(ConsumeContext<TMessage> context)
    {
        return _consumer.Consume(context.Message);
    }
}

...

// create wrapper registrations
cfg.Consumer(() => new WrapperConsumer<MyConsumer, MyMessage>(new MyConsumer()));


additional code applicable to both approaches
// marker interface
public interface IMyConsumerInterface
{
}

public interface IMyConsumerInterface<T> : IMyConsumerInterface
    where T : IMyMessageInterface 
{
    Task Consume(T message);
}

...

builder.RegisterAssemblyTypes(ThisAssembly)
           .AssignableTo<IMyConsumerInterface>()
           .AsSelf()
           .As<IMyConsumerInterface>();

...

var interfaceType = typeof(IMyConsumerInterface);
var consumerTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
    .Where(x => interfaceType.IsAssignableFrom(x) && !x.IsInterface && !x.IsAbstract)
    .ToList();


RE: UPDATE 5
builder.Register(context =>
{
    var ctx = context.Resolve<IComponentContext>();
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.ReceiveEndpoint(host, messageBusOptions.QueueName, consumer =>
        {
            foreach (var adapterType in adapterTypes)
                consumer.Consumer(adapterType, (Type type) => ctx .Resolve(adapterType));
        });
    });
    return busControl;
})
like image 102
Igor Dražić Avatar answered Oct 26 '22 10:10

Igor Dražić