I'm new to Spring boot and I'm playing around with it. Currently I've build some apllications that I want to be able to communicate with each other through queues. I currently have a Listener object that can receive message from a particular queue.
@Configuration
public class Listener {
final static String queueName = "myqueue";
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
This works. However, now I want to be able to listen to another queue. So I figured I'd copy the above object and change the queue name. Unfortunately this did not work as Spring boot only creates a connection for one of them. Any ideas on how I can have my Spring Boot application listen to multiple queues?
Ok, I figured out how to get it to listen to multiple queues. Think there might be some downsides compared to my other solution, mainly that it doesn't work if the queue listed does not exist. I ended up using a totally different approach using a @RabbitListener
@Component
public class EventListener {
private static Logger LOG = LoggerFactory.getLogger(EventListener.class);
private CountDownLatch latch = new CountDownLatch(1);
@RabbitListener(queues = "myqueue")
public void processPaymentMessage(Object message) {
LOG.info("Message is of type: " + message.getClass().getName());
if(!(message instanceof byte[])) message = ((Message) message).getBody();
String content = new String((byte[])message, StandardCharsets.UTF_8);
LOG.info("Received on myqueue: " + content);
latch.countDown();
}
@RabbitListener(queues = "myotherqueue")
public void processOrderMessage(Object message) {
LOG.info("Message is of type: " + message.getClass().getName());
if(!(message instanceof byte[])) message = ((Message) message).getBody();
String content = new String((byte[])message, StandardCharsets.UTF_8);
LOG.info("Received on myotherqueue: " + content);
latch.countDown();
}
}
The whole check on byte[] is in there because that what a message send from the commandline looks like. Otherwise it's a org.springframework.amqp.core.Message.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With