Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I handle multiple messages concurrently from a JMS topic (not queue) with java and spring 3.0?

Note that I'd like multiple message listeners to handle successive messages from the topic concurrently. In addition I'd like each message listener to operate transactionally so that a processing failure in a given message listener would result in that listener's message remaining on the topic.

The spring DefaultMessageListenerContainer seems to support concurrency for JMS queues only.

Do I need to instantiate multiple DefaultMessageListenerContainers?

If time flows down the vertical axis:

ListenerA reads msg 1        ListenerB reads msg 2        ListenerC reads msg 3
ListenerA reads msg 4        ListenerB reads msg 5        ListenerC reads msg 6
ListenerA reads msg 7        ListenerB reads msg 8        ListenerC reads msg 9
ListenerA reads msg 10       ListenerB reads msg 11       ListenerC reads msg 12
...

UPDATE:
Thanks for your feedback @T.Rob and @skaffman.

What I ended up doing is creating multiple DefaultMessageListenerContainers with concurrency=1 and then putting logic in the message listener so that only one thread would process a given message id.

like image 318
Edgar Styles Avatar asked Jun 21 '10 22:06

Edgar Styles


People also ask

What is concurrency in JMS?

concurrency. The number of concurrent sessions/consumers to start for each listener. Can either be a simple number indicating the maximum number (e.g. "5") or a range indicating the lower as well as the upper limit (e.g. "3-5"). Note that a specified minimum is just a hint and might be ignored at runtime.

What is the difference between JMS queue and Topic?

In queue, you only have one receiver or consumer; unlike in topic where in you can have your message be disseminated to a number of subscribers. Also, in topic, the publisher has to be continuously active for a subscriber to receive the messages. Otherwise the message will be reallocated.

Can a JMS queue have multiple consumers?

Support for multiple-consumer queues is a Message Queue feature (the JMS specification defines messaging behavior in the case of only one consumer accessing a queue). When multiple consumers access a queue, the load-balancing among them takes into account each consumer's capacity and message processing rate.

What does do not raise the number of concurrent consumers for a topic?

Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable. This is interesting, and makes sense when you think about it. The same would occur if you had multiple DefaultMessageListenerContainer instances.


2 Answers

You don't want multiple DefaultMessageListenerContainer instances, no, but you do need to configure the DefaultMessageListenerContainer to be concurrent, using the concurrentConsumers property:

Specify the number of concurrent consumers to create. Default is 1.

Specifying a higher value for this setting will increase the standard level of scheduled concurrent consumers at runtime: This is effectively the minimum number of concurrent consumers which will be scheduled at any given time. This is a static setting; for dynamic scaling, consider specifying the "maxConcurrentConsumers" setting instead.

Raising the number of concurrent consumers is recommendable in order to scale the consumption of messages coming in from a queue. However, note that any ordering guarantees are lost once multiple consumers are registered. In general, stick with 1 consumer for low-volume queues.

However, there's big warning at the bottom:

Do not raise the number of concurrent consumers for a topic. This would lead to concurrent consumption of the same message, which is hardly ever desirable.

This is interesting, and makes sense when you think about it. The same would occur if you had multiple DefaultMessageListenerContainer instances.

I think perhaps you need to rethink your design, although I'm not sure what I'd suggest. Concurrent consumption of pub/sub messages seems like a perfectly reasonable thing to do, but how to avoid getting the same message delivered to all of your consumers at the same time?

like image 66
skaffman Avatar answered Sep 23 '22 06:09

skaffman


At least in ActiveMQ what you want is totally supported, his name is VirtualTopic

The concept is:

  1. You create a VirtualTopic (Simply creating a Topic using the prefix VirtualTopic. ) eg. VirtualTopic.Color
  2. Create a consumer subscribing to this VirtualTopic matching this pattern Consumer.<clientName>.VirtualTopic.<topicName> eg. Consumer.client1.VirtualTopic.Color, doing it, Activemq will create a queue with that name and that queue will subscribe to VirtualTopic.Color then every message published to this Virtual Topic will be delivered to client1 queue, note that it works like rabbitmq exchanges.
  3. You are done, now you can consume client1 queue like every queue, with many consumers, DLQ, customized redelivery policy, etc.
  4. At this point I think you understood that you can create client2, client3 and how many subscribers you want, all of them will receive a copy of the message published to VirtualTopic.Color

Here the code

@Component
public class ColorReceiver {

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    // simply generating data to the topic
    long id=0;
    @Scheduled(fixedDelay = 500)
    public void postMail() throws JMSException, IOException {

        final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)];
        final Color color = new Color(++id, colorName.getName());
        final ActiveMQObjectMessage message = new ActiveMQObjectMessage();
        message.setObject(color);
        message.setProperty("color", color.getName());
        LOGGER.info("status=color-post, color={}", color);
        jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message);
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer"
        selector = "color <> 'RED'"
    )
    public void genericReceiveMessage(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver, color={}", color);
    }

    /**
     * Listen only red colors messages
     *
     * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that
     * the containers clientId need to be different between each other
     */
    @JmsListener(
//      destination = "Consumer.redColorContainer.VirtualTopic.color",
        destination = "Consumer.client1.VirtualTopic.color",
        containerFactory = "redColorContainer", selector = "color='RED'"
    )
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException {
        LOGGER.info("status=RED-color-receiver, color={}", message.getObject());
    }

    /**
     * Listen all colors messages
     */
    @JmsListener(
        destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer"
    )
    public void genericReceiveMessage2(Color color) throws InterruptedException {
        LOGGER.info("status=GEN-color-receiver-2, color={}", color);
    }

}

@SpringBootApplication
@EnableJms
@EnableScheduling
@Configuration
public class Config {

    /**
     * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different
     * clientIds per consumer pool (as two @JmsListener above, or two application instances)
     * 
     */
    @Bean
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-5");
        configurer.configure(factory, connectionFactory);
        // container.setClientId("aId..."); lets spring generate a random ID
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {

        // necessary when post serializable objects (you can set it at application.properties)
        connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName()));

        final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-2");
        configurer.configure(factory, connectionFactory);
        return factory;
    }

}

public class Color implements Serializable {

    public static final Color WHITE = new Color("WHITE");
    public static final Color BLUE = new Color("BLUE");
    public static final Color RED = new Color("RED");

    private String name;
    private long id;

    // CONSTRUCTORS, GETTERS AND SETTERS
}
like image 21
deFreitas Avatar answered Sep 25 '22 06:09

deFreitas