Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Request-response pattern using Spring amqp library

everyone. I have an HTTP API for posting messages in a RabbitMQ broker and I need to implement the request-response pattern in order to receive the responses from the server. So I am something like a bridge between the clients and the server. I push the messages to the broker with specific routing-key and there is a Consumer for that messages, which is publishing back massages as response and my API must consume the response for every request. So the diagram is something like this:

Request-Response pattern

So what I do is the following- For every HTTP session I create a temporary responseQueue(which is bound to the default exchange, with routing key the name of that queue), after that I set the replyTo header of the message to be the name of the response queue(where I will wait for the response) and also set the template replyQueue to that queue. Here is my code:

public void sendMessage(AbstractEvent objectToSend, final String routingKey) {
    final Queue responseQueue = rabbitAdmin.declareQueue();
    byte[] messageAsBytes = null;
    try {
        messageAsBytes = new ObjectMapper().writeValueAsBytes(objectToSend);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    MessageProperties properties = new MessageProperties();
    properties.setHeader("ContentType", MessageBodyFormat.JSON);
    properties.setReplyTo(responseQueue.getName());
    requestTemplate.setReplyQueue(responseQueue);

    Message message = new Message(messageAsBytes, properties);
    Message receivedMessage = (Message)requestTemplate.convertSendAndReceive(routingKey, message);
}

So what is the problem: The message is sent, after that it is consumed by the Consumer and its response is correctly sent to the right queue, but for some reason it is not taken back in the convertSendAndReceived method and after the set timeout my receivedMessage is null. So I tried to do several things- I started to inspect the spring code(by the way it's a real nightmare to do that) and saw that is I don't declare the response queue it creates a temporal for me, and the replyTo header is set to the name of the queue(the same what I do). The result was the same- the receivedMessage is still null. After that I decided to use another template which uses the default exchange, because the responseQueue is bound to that exchange:

requestTemplate.send(routingKey, message);
Message receivedMessage = receivingTemplate.receive(responseQueue.getName());

The result was the same- the responseMessage is still null. The versions of the amqp and rabbit are respectively 1.2.1 and 1.2.0. So I am sure that I miss something, but I don't know what is it, so if someone can help me I would be extremely grateful.

like image 958
Hristo Angelov Avatar asked Jul 16 '14 15:07

Hristo Angelov


1 Answers

1> It's strange that RabbitTemplate uses doSendAndReceiveWithFixed if you provide the requestTemplate.setReplyQueue(responseQueue). Looks like it is false in your explanation.

2> To make it worked with fixed ReplyQueue you should configure a reply ListenerContainer:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueues(responseQueue);
container.setMessageListener(requestTemplate);

3> But the most important part here is around correlation. The RabbitTemplate.sendAndReceive populates correlationId message property, but the consumer side has to get deal with it, too: it's not enough just to send reply to the responseQueue, the reply message should has the same correlationId property. See here: how to send response from consumer to producer to the particular request using Spring AMQP?

BTW there is no reason to populate the Message manually: You can just simply support Jackson2JsonMessageConverter to the RabbitTemplate and it will convert your objectToSend to the JSON bytes automatically with appropriate headers.

like image 110
Artem Bilan Avatar answered Oct 11 '22 20:10

Artem Bilan