I'm looking at MassTransit as a ServiceBus implementation to use in a web project.
I am playing with the Request/Response pattern and am seeing a long delay between the consumer receiving the message and responding, and the request publisher handling the response; sometimes, it seems like the response is never going to come through (having left it running for 10 minutes, the response has still not come through). the only times that I have seen the handle delegate get called with the response is after a 30 second timeout period and the timeout exception being thrown; in this situation, the breakpoint set on the handler delegate is hit.
The setup is a standard affair - I have a web app that is publishing requests, a console app that is consuming requests and sending responses, for the web app to handle the responses in the callback.
I'm using Castle Windsor, and the container is initialized in the web project using WebActivator:
[assembly: WebActivator.PreApplicationStartMethod(typeof(BootStrapper), "PreStart")]
[assembly: WebActivator.PostApplicationStartMethod(typeof(BootStrapper), "PostStart")]
[assembly: WebActivator.ApplicationShutdownMethodAttribute(typeof(BootStrapper), "Stop")]
namespace Web.App_Start
{
public static class BootStrapper
{
internal static IWindsorContainer Container { get; private set; }
public static void PreStart()
{
Container = new WindsorContainer().Install(FromAssembly.This());
}
public static void PostStart()
{
FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
RouteConfig.RegisterRoutes(RouteTable.Routes);
BundleConfig.RegisterBundles(BundleTable.Bundles);
ApiConfig.Configure(Container);
MvcConfig.Configure(Container);
}
public static void Stop()
{
if (Container != null)
Container.Dispose();
}
}
}
In the web app project (an ASP.NET Web API project), the WindsorInstaller for MassTransit looks like
public class MassTransitInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
container.Register(AllTypes.FromThisAssembly().BasedOn<IConsumer>());
var bus = ServiceBusFactory.New(configurator =>
{
configurator.UseMsmq();
configurator.VerifyMsmqConfiguration();
configurator.UseMulticastSubscriptionClient();
configurator.ReceiveFrom("msmq://localhost/web");
configurator.EnableMessageTracing();
configurator.Subscribe(x => x.LoadFrom(container));
});
container.Register(Component.For<IServiceBus>().Instance(bus));
}
}
In the console app project, the WindsorInstaller looks like
public class MassTransitInstaller : IWindsorInstaller
{
public void Install(IWindsorContainer container, IConfigurationStore store)
{
container.Register(AllTypes.FromAssemblyContaining<BasicRequestCommandHandler>().BasedOn<IConsumer>());
var bus = ServiceBusFactory.New(configurator =>
{
configurator.UseMsmq();
configurator.VerifyMsmqConfiguration();
configurator.UseMulticastSubscriptionClient();
configurator.ReceiveFrom("msmq://localhost/console");
configurator.Subscribe(x => x.LoadFrom(container));
});
container.Register(Component.For<IServiceBus>().Instance(bus));
}
}
I have an ApiController
with the following GET action method
public class ExampleController : ApiController
{
private readonly IServiceBus _bus;
public HelloController(IServiceBus bus)
{
_bus = bus;
}
// GET api/hello?text={some text}
public Task<IBasicResponseCommand> Get(string text)
{
var command = new BasicRequestCommand {Text = text};
var tcs = new TaskCompletionSource<IBasicResponseCommand>();
_bus.PublishRequest(command, c =>
{
c.Handle<IBasicResponseCommand>(r =>
{
tcs.SetResult(r);
});
});
return tcs.Task;
}
}
BasicRequestCommand and BasicResponseCommand look like so
public interface IBasicRequestCommand
{
Guid CorrelationId { get; set; }
string Text { get; set; }
}
public class BasicRequestCommand :
CorrelatedBy<Guid>, IBasicRequestCommand
{
public Guid CorrelationId { get; set; }
public string Text { get; set; }
public BasicRequestCommand()
{
CorrelationId = Guid.NewGuid();
}
}
public interface IBasicResponseCommand
{
Guid CorrelationId { get; set; }
string Text { get; set; }
}
public class BasicResponseCommand :
CorrelatedBy<Guid>, IBasicResponseCommand
{
public Guid CorrelationId { get; set; }
public string Text { get; set; }
}
And the handler responding to the BasicRequestCommand in the console app:
public class BasicRequestCommandHandler : Consumes<IBasicRequestCommand>.Context
{
public void Consume(IConsumeContext<IBasicRequestCommand> context)
{
Console.Out.WriteLine("received message text " + context.Message.Text);
context.Respond(new BasicResponseCommand { Text = "Hello " + context.Message.Text, CorrelationId = context.Message.CorrelationId });
}
}
I was anticipating with all of this running locally that the request/response would be in the order of a few seconds at most. Am I missing something in configuration?
In addition, I wanted to hook MassTransit up to log4net. I am using Windsor's log4net logging facility and have a log4net section in web.config. This is all working fine for ILogger
implementations provided by Windsor (and also for NHibernate logging), but it's not clear from the documentation how to configure MassTransit to use this for logging. Any ideas?
Just as Andrei Volkov and Chris Patterson were discussing on MassTransit google group, it seems that this issue stems from switching MassTransit to using SynchronizationContext
, which for some reason does not work as expected.
For the time being one workaround seems to be transitioning to async MassTransit requests, or going back to v2.1.1 that does not use the offending SynchronizationContext
.
(Will posts updates on this issue here for posterity if noone else does that first.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With