Note that I'd like multiple message listeners to handle successive messages from the topic concurrently. In addition I'd like each message listener to operate transactionally so that a processing failure in a given message listener would result in that listener's message remaining on the topic.
The spring DefaultMessageListenerContainer seems to support concurrency for JMS queues only.
Do I need to instantiate multiple DefaultMessageListenerContainers?
If time flows down the vertical axis:
ListenerA reads msg 1 ListenerB reads msg 2 ListenerC reads msg 3
ListenerA reads msg 4 ListenerB reads msg 5 ListenerC reads msg 6
ListenerA reads msg 7 ListenerB reads msg 8 ListenerC reads msg 9
ListenerA reads msg 10 ListenerB reads msg 11 ListenerC reads msg 12
...
UPDATE:
Thanks for your feedback @T.Rob and @skaffman.
What I ended up doing is creating multiple DefaultMessageListenerContainers
with concurrency=1
and then putting logic in the message listener so that only one thread would process a given message id.
concurrency. The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime.
In queue, you only have one receiver or consumer; unlike in topic where in you can have your message be disseminated to a number of subscribers. Also, in topic, the publisher has to be continuously active for a subscriber to receive the messages. Otherwise the message will be reallocated.
Support for multiple-consumer queues is a Message Queue feature (the JMS specification defines messaging behavior in the case of only one consumer accessing a queue). When multiple consumers access a queue, the load-balancing among them takes into account each consumer's capacity and message processing rate.
Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable. This is interesting, and makes sense when you think about it. The same would occur if you had multiple DefaultMessageListenerContainer instances.
You don't want multiple DefaultMessageListenerContainer
instances, no, but you do need to configure the DefaultMessageListenerContainer
to be concurrent, using the concurrentConsumers
property:
Specify the number of concurrent consumers to create. Default is 1.
Specifying a higher value for this setting will increase the standard level of scheduled concurrent consumers at runtime: This is effectively the minimum number of concurrent consumers which will be scheduled at any given time. This is a static setting; for dynamic scaling, consider specifying the "maxConcurrentConsumers" setting instead.
Raising the number of concurrent consumers is recommendable in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.
However, there's big warning at the bottom:
Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable.
This is interesting, and makes sense when you think about it. The same would occur if you had multiple DefaultMessageListenerContainer
instances.
I think perhaps you need to rethink your design, although I'm not sure what I'd suggest. Concurrent consumption of pub/sub messages seems like a perfectly reasonable thing to do, but how to avoid getting the same message delivered to all of your consumers at the same time?
At least in ActiveMQ what you want is totally supported, his name is VirtualTopic
The concept is:
VirtualTopic.
) eg. VirtualTopic.Color
Consumer.<clientName>.VirtualTopic.<topicName>
eg. Consumer.client1.VirtualTopic.Color
, doing it, Activemq will create a queue with that name and that queue will subscribe to VirtualTopic.Color
then every message published to this Virtual Topic will be delivered to client1 queue, note that it works like rabbitmq exchanges.VirtualTopic.Color
Here the code
@Component
public class ColorReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);
@Autowired
private JmsTemplate jmsTemplate;
// simply generating data to the topic
long id=0;
@Scheduled(fixedDelay = 500)
public void postMail() throws JMSException, IOException {
final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
final Color color = new Color(++id, colorName.getName());
final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
message.setObject(color);
message.setProperty("color", color.getName());
LOGGER.info("status=color-post, color={}", color);
jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
selector = "color <> 'RED'"
)
public void genericReceiveMessage(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver, color={}", color);
}
/**
* Listen only red colors messages
*
* the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
* the containers clientId need to be different between each other
*/
@JmsListener(
// destination = "Consumer.redColorContainer.VirtualTopic.color",
destination = "Consumer.client1.VirtualTopic.color",
containerFactory = "redColorContainer", selector = "color='RED'"
)
public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
}
/**
* Listen all colors messages
*/
@JmsListener(
destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
)
public void genericReceiveMessage2(Color color) throws InterruptedException {
LOGGER.info("status=GEN-color-receiver-2, color={}", color);
}
}
@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {
/**
* Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
* clientIds per consumer pool (as two @JmsListener above, or two application instances)
*
*/
@Bean
public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-5");
configurer.configure(factory, connectionFactory);
// container.setClientId("aId..."); lets spring generate a random ID
return factory;
}
@Bean
public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
// necessary when post serializable objects (you can set it at application.properties)
connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("1-2");
configurer.configure(factory, connectionFactory);
return factory;
}
}
public class Color implements Serializable {
public static final Color WHITE = new Color("WHITE");
public static final Color BLUE = new Color("BLUE");
public static final Color RED = new Color("RED");
private String name;
private long id;
// CONSTRUCTORS, GETTERS AND SETTERS
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With