I am facing an issue in receiving a message from RabbitMQ. I am sending a message like below
HashMap<Object, Object> senderMap=new HashMap<>();
senderMap.put("STATUS", "SUCCESS");
senderMap.put("EXECUTION_START_TIME", new Date());
rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);
If we see in RabbitMQ, we will get a fully qualified type.
In the current scenario, we have n number of producer for the same consumer. If i use any mapper, it leads to an exception. How will i send a message so that it doesn't contain any type_id and i can receive the message as Message object and later i can bind it to my custom object in the receiver.
I am receiving message like below. Could you please let me know how to use Jackson2MessageConverter so that message will get directly binds to my Object/HashMap from Receiver end. Also i have removed the Type_ID now from the sender.
How Message looks in RabbitMQ
priority: 0 delivery_mode: 2 headers:
ContentTypeId: java.lang.Object KeyTypeId: java.lang.Object content_encoding: UTF-8 content_type: application/json {"Execution_start_time":1473747183636,"status":"SUCCESS"}
@Component
public class AdapterOutputHandler {
private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);
@RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
public void handleAdapterQueueMessage(HashMap<String,Object> message){
System.out.println("Receiver:::::::::::"+message.toString());
}
}
Connection
@Bean(name="adapterOPListenerContainerFactory")
public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
DefaultClassMapper classMapper = new DefaultClassMapper();
messageConverter.setClassMapper(classMapper);
factory.setMessageConverter(messageConverter);
}
Exception
Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)
I don't want to use __TYPE__ID from sender because they are multiple senders for the same queue and only one consumer.
The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container".
AMQP stands for Advances Messaging Queing Protocol. The AMQP listener can be used to read from an AMQP destination. This destination can be a queue or topic in an AMQP environment. The connection factory determines which AMQP environment will be used.
RabbitMQ is a messaging broker. It accepts messages from publishers, routes them and, if there were queues to route to, stores them for consumption or immediately delivers to consumers, if any. Consumers consume from queues. In order to consume messages there has to be a queue.
It listens for messages on the spring-boot queue. Because the Receiver class is a POJO, it needs to be wrapped in the MessageListenerAdapter , where you specify that it invokes receiveMessage . JMS queues and AMQP queues have different semantics. For example, JMS sends queued messages to only one consumer.
it leads to an exception
What exception?
TypeId: com.diff.approach.JobListenerDTO
That means you are sending a DTO, not a hash map as you describe in the question.
If you want to remove the typeId header, you can use a message post processor...
rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
m.getMessageProperties.getHeaders().remove("__TypeId__");
return m;
});
(or , new MessagePostProcessor() {...}
if you're not using Java 8).
EDIT
What version of Spring AMQP are you using? With 1.6 you don't even have to remove the __TypeId__
header - the framework looks at the listener parameter type and tells the Jackson converter the type so it automatically converts to that (if it can). As you can see here; it works fine without removing the type id...
package com.example;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class So39443850Application {
private static final String QUEUE = "so39443850";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
context.close();
}
private final CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
public void listen(HashMap<String, Object> message) {
System.out.println(message.getClass() + ":" + message);
latch.countDown();
}
@Bean
public Queue queue() {
return new Queue(QUEUE);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
public static class DTO {
private String foo;
private String baz;
public DTO(String foo, String baz) {
this.foo = foo;
this.baz = baz;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
public String getBaz() {
return this.baz;
}
public void setBaz(String baz) {
this.baz = baz;
}
}
}
Result:
class java.util.HashMap:{foo=baz, baz=qux}
This is described in the documentation...
In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.
You can also configure a custom ClassMapper
to always return HashMap
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With