Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ Missing data in body after dequeue

I'm working with RabbitMQ on someone else's project and having trouble with dequeuing and missing data.

The data is all there as a string when I publish, and it's also on there correctly on the RabbitMQ queue. When I pull the data off, buts of the data is there like the User ID, but the rest of it is gone. I've looked throughout the code and I'm fairly positive that its something going on with RabbitMQ, and its happening when I dequeue. Any help would be greatly appreciated. Thanks. Here is the code right before the publish.

        private bool sendJobToMQ(EncodeJobModel job, string p_correlation_id, string p_request_routing_key)
    {
        JavaScriptSerializer ser = new JavaScriptSerializer();
        StringBuilder sb_job = new StringBuilder();
        ser.Serialize(job, sb_job);
        string rpc_reply_queue;

        ConnectionFactory factory = new ConnectionFactory();
        factory.HostName = HOST_NAME;
        factory.VirtualHost = VHOST_NAME;
        factory.UserName = USERNAME;
        factory.Password = PASSWORD;
        IConnection rabconn = factory.CreateConnection();
        IModel sender_channel = rabconn.CreateModel();
        try
        {
            sender_channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
        }
        catch (Exception err)
        {
            logger.Error("Error Declaring Exchange " + EXCHANGE_NAME + ": " + err.ToString());
            return false;
        }
        try
        {
            sender_channel.QueueDeclare(REQUEST_QUEUE, true, false, false, null);
        }
        catch (Exception err)
        {
            logger.Error("Error QueueDeclare (" + REQUEST_QUEUE + " true, false, false, null): " + err.ToString());
            return false;
        }
        try
        {
            sender_channel.QueueBind(REQUEST_QUEUE, EXCHANGE_NAME, REQUEST_ROUTING_KEY, null);
        }
        catch (Exception err)
        {
            logger.Error("Error QueueBind (" + REQUEST_QUEUE + " -> " + EXCHANGE_NAME + " " + REQUEST_ROUTING_KEY + ", null): " + err.ToString());
            return false;
        }

        //rpc_reply_queue = sender_channel.QueueDeclare("rq_" + job.encodejob_id.ToString(), false, false, true, null);
        //////bind the rpc reply queue to the exchange via a routing key (I appended _routingkey to signify this)
        //sender_channel.QueueBind(rpc_reply_queue, EXCHANGE_NAME, rpc_reply_queue + "_routingkey");

        //// Not sure what the props object is for yet but you can try to pass null in the mean time - Steve "Apeshit" Han
        BasicProperties props = new BasicProperties();
        props.CorrelationId = p_correlation_id;
        //props.ReplyTo = rpc_reply_queue;

        try
        {
            sender_channel.BasicPublish(EXCHANGE_NAME, REQUEST_ROUTING_KEY, props, Encoding.UTF8.GetBytes(sb_job.ToString()));

        }

And the code for the dequeue.

 QueueingBasicConsumer consumer = new QueueingBasicConsumer(p_channel);
        string consumerTag = p_channel.BasicConsume(p_queue, false, consumer);
        if (_is_console && Environment.UserInteractive)
            Console.WriteLine("Listening...");
        while (m_Listen)
        {
            try
            {
                //get the properties of the message, including the ReplyTo queue, to which we can append '_routingkey' (designated by me), to reply with messages
                BasicDeliverEventArgs e;
                Object message;
                if (!consumer.Queue.Dequeue(4000, out message)) {
                    // we do not wait to indefinitely block on waiting for the queue
                    // if nothing in queue continue loop iteration and wait again
                    continue;
                }

                // cast as necessary back to BasicDeliverEventArgs
                e = (BasicDeliverEventArgs)message;
                IBasicProperties props = e.BasicProperties;
                //get the Correlation ID sent by the client to track the job
                string client_correlation_id = props.CorrelationId;
                // I left out the reply_to field in the wizard, it can be set back in ApiEncodeServiceDefault - Steve "Smurfing Smurf" Han
                //string reply_to = props.ReplyTo;

                //get the body of the request
                byte[] body = e.Body;
                string body_result = Encoding.UTF8.GetString(body);
                bool redelivered = e.Redelivered;

The e.Body string is missing data.

like image 679
user1608132 Avatar asked Nov 13 '22 00:11

user1608132


1 Answers

why continue if you don't have any message it is better to block until you receive a message otherwise the process is not interesting (work with no data?) try like this

QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, null, consumer);
while (m_Listen) {
try {
RabbitMQ.Client.Events.BasicDeliverEventArgs e =
(RabbitMQ.Client.Events.BasicDeliverEventArgs)
consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
// ... process the message
channel.BasicAck(e.DeliveryTag, false);
} catch (OperationInterruptedException ex) {
// The consumer was removed, either through
// channel or connection closure, or through the
// action of IModel.BasicCancel().
break;
}

}

like image 172
Hassan Boutougha Avatar answered Dec 06 '22 13:12

Hassan Boutougha