Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ not receiving messages when used with TopShelf as a Windows Service

I'm trying to convert my RabbitMQ micro-service to a windows service. I have used TopShelf for the conversion. My RabbitMQ micro-service works perfectly fine on its own but when I run it as a service it no longer receives messages. In my public static void Main(string[] args) I have:

 HostFactory.Run(host =>
                {
                    host.Service<PersonService>(s =>                      
                    {
                        s.ConstructUsing(name => new PersonService());
                        s.WhenStarted(tc => tc.Start());             
                        s.WhenStopped(tc => tc.Stop());               
                    });
                    host.SetDescription("Windows service that provides database access totables."); 
                    host.SetDisplayName("Service");                   
                    host.SetServiceName("Service");
                });
            }

Then in my PersonService class I have

public void Start() {
            ConsumeMessage();
        }

And finally my ConsumeMessage function:

private static void ConsumeMessage() {
        MessagingConfig.SetInstance(new MessagingConstants());
        IMessageFactory pmfInst = MessageFactory.Instance;

        //message worker
        var factory = new ConnectionFactory() {
            HostName = MessagingConfig.Instance.GetBrokerHostName(),
            UserName = MessagingConfig.Instance.GetBrokerUserName(),
            Password = MessagingConfig.Instance.GetBrokerPassword()
        };

        var connection = factory.CreateConnection();

        using (var channel = connection.CreateModel()) {
            channel.QueueDeclare(queue: MessagingConfig.Instance.GetServiceQueueName(),
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            channel.BasicQos(0, 1, false);

            var consumer = new EventingBasicConsumer(channel);

            channel.BasicConsume(queue: MessagingConfig.Instance.GetServiceQueueName(),
                                 noAck: false,
                                 consumer: consumer);

            Console.WriteLine("Service.");
            Console.WriteLine(" [x] Awaiting RPC requests");


            // Code Below Is Not Executed In Service
            consumer.Received += (model, ea) => {

                string response = null;

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                string receivedMessage = null;

                try {
                    receivedMessage = Encoding.UTF8.GetString(body);
                    response = ProcessMessage(receivedMessage);
                }
                catch (Exception e) {
                    // Received message is not valid.
                    WinLogger.Log.Error(
                        "Errror Processing Message: " + receivedMessage + " :" + e.Message);

                    response = "";
                }
                finally {

                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                    basicProperties: replyProps, body: responseBytes);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag,
                    multiple: false);
                }
            };
            Console.ReadLine();
        }

Looking at A similar SO question it looks like it has something to do with the return the Windows Service is wanting, but I'm not sure of how to call ConsumeMessage so consumer.Received += (model, ea) => {...}; is executed.

EDIT: It looks like my blocking mechanism Console.ReadLine(); is ignored by the service so it just continues on and disposes of the message consumer. So how do I block there for messages to be received?

like image 324
DevEng Avatar asked Aug 14 '17 17:08

DevEng


1 Answers

You code uses using construct, which means when your OnStart method returns, your channel will actually be disposed. The docs suggest to do your initialization on OnStart, so create your channel and consumer there, but don't use using:

this.connection = factory.CreateConnection();

this.channel = connection.CreateModel();
this.consumer = new EventingBasicConsumer(this.channel);

Then those objects will continue to exist after OnStart method is finished. You should dispose of them in the OnStop method.

like image 97
Alex Buyny Avatar answered Nov 10 '22 14:11

Alex Buyny