Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring JMS receive topic messages

Tags:

spring

jms

I am working on a simple tutorial. I have a publisher that sends message on a topic and subscribers to receive it. When I start the application, spring config file loads up and then I get the following error

    2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:3,started=true}
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:2,started=true}
2011-10-20 21:50:44,348 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Setup of JMS message listener invoker failed for destination 'RateTopic' - trying to recover. Cause: Destination [RateTopic] is not of expected type [javax.jms.Queue]
org.springframework.jms.support.destination.DestinationResolutionException: Destination [RateTopic] is not of expected type [javax.jms.Queue]
    at org.springframework.jms.support.destination.JndiDestinationResolver.validateDestination(JndiDestinationResolver.java:147)
    at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:112)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:100)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:221)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1081)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1057)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:722)

Why does spring think that it should be a queue instead of topic

my jndi file looks like this

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = TopicCF
topic.RateTopic = RateTopic

spring config file is

<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.factory.initial">
                org.apache.activemq.jndi.ActiveMQInitialContextFactory
            </prop>
            <prop key="java.naming.provider.url">tcp://localhost:61616</prop>
            <prop key="java.naming.security.principal">system</prop>
            <prop key="java.naming.security.credentials">manager</prop>
        </props>
    </property>
</bean>

<bean id="jndiTopicConnFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTemplate"/>
    <!-- JNDI name of connection factory as defined by provider -->
    <property name="jndiName" value="TopicCF"/>
</bean>

<bean id="topicConnFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jndiTopicConnFactory"/>
    <!-- Number of sessions that will be cached -->
    <property name="sessionCacheSize" value="1"/>
</bean>

<bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
    <property name="jndiTemplate" ref="jndiTemplate"/>
    <property name="cache" value="true"/>
    <!-- do not create a dynamic destination if the destination name is not found in JNDI -->
    <property name="fallbackToDynamicDestination" value="false"/>
</bean>

<bean id="messageListener" class="com.merc.springjmspubsublenderborrower.TBorrower"/>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="topicConnFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="concurrentConsumers" value="3" />
    <property name="destinationName" value="RateTopic"/>
    <property name="messageListener" ref="messageListener" />
    <property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

My subscriber implements MessageListener

@Override
public void onMessage(Message message) {
    try {
        // Get the data from the message
        BytesMessage msg = (BytesMessage) message;
        double newRate = msg.readDouble();
        // If the rate is at least 1 point lower than the current rate, then
        //recommend refinancing
        if ((currentRate - newRate) >= 1.0) {
            System.out.println(
                    "New rate = " + newRate + " - Consider refinancing loan");
        } else {
            System.out.println("New rate = " + newRate + " - Keep existing loan");
        }
        System.out.println("\nWaiting for rate updates...");
    } catch (Exception ex) {
        ex.printStackTrace(System.out);
        System.exit(1);
    }
}

public static void main(String argv[]) {

    ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml");

    try {
        // Run until enter is pressed
        BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in));
        System.out.println("TBorrower application started");
        System.out.println("Press enter to quit application");
        stdin.readLine();
    } catch (IOException ioe) {
        ioe.printStackTrace();
    }
}
like image 836
user373201 Avatar asked Oct 21 '11 02:10

user373201


People also ask

How can I send a message to JMS topic in Spring boot?

As we want to send a message to a topic we need to update our SenderConfig configuration. Use the setPubSubDomain() method on the JmsTemplate to set pubSubDomain to true . If you are using the autoconfigured JmsTemplate you can change the JMS domain by setting the spring. jms.

What is @JmsListener?

The JMS Listener adapter is a JMS (Java Message Service) client which provides the ability to perform Active Sync processing on messages from a JMS-compliant messaging system queue or topic. This adapter is a source-only adapter; it cannot write messages back to a queue or topic.

What happens when you send a JMS message to a topic?

When you publish a message it goes to all the subscribers who are interested. Let’s build an example where we send a JMS message to a topic using Spring JMS. On the topic, we will register two subscribers who will each receive the message. We start from a previous Spring JMS sample application.

What is a spring JMS topic?

If you want to learn more about Spring JMS - head on over to the Spring JMS tutorials page. 1. What is a Topic? A JMS topic is a destination that delivers messages to multiple subscribers. It implements publish and subscribe semantics.

What is publish and subscribe semantics in spring JMS?

It implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested. Let’s build an example where we send a JMS message to a topic using Spring JMS.

How can springspring’s jmstemplate receive messages?

Spring’s JmsTemplate can receive messages directly through its receive method, but that works only synchronously, meaning that it blocks.


1 Answers

You're trying to consume from a topic, but you haven't set the pubSubDomain property on the DefaultMessageListenerContainer, and it defaults to "false", meaning point-to-point, meaning a queue instead of a topic. Thus the error message telling you that RateTopic isn't a javax.jms.Queue.

like image 178
Ryan Stewart Avatar answered Nov 15 '22 11:11

Ryan Stewart