I am working on a simple tutorial. I have a publisher that sends message on a topic and subscribers to receive it. When I start the application, spring config file loads up and then I get the following error
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.support.destination.JndiDestinationResolver] - Located object with JNDI name [RateTopic]
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:3,started=true}
2011-10-20 21:50:39,340 DEBUG [org.springframework.jms.connection.CachingConnectionFactory] - Closing cached Session: ActiveMQSession {id=ID:Reverb0253-PC-62259-1319161839013-0:1:2,started=true}
2011-10-20 21:50:44,348 WARN [org.springframework.jms.listener.DefaultMessageListenerContainer] - Setup of JMS message listener invoker failed for destination 'RateTopic' - trying to recover. Cause: Destination [RateTopic] is not of expected type [javax.jms.Queue]
org.springframework.jms.support.destination.DestinationResolutionException: Destination [RateTopic] is not of expected type [javax.jms.Queue]
at org.springframework.jms.support.destination.JndiDestinationResolver.validateDestination(JndiDestinationResolver.java:147)
at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:112)
at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:100)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:221)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1081)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1057)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
at java.lang.Thread.run(Thread.java:722)
Why does spring think that it should be a queue instead of topic
my jndi file looks like this
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames = TopicCF
topic.RateTopic = RateTopic
spring config file is
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">
org.apache.activemq.jndi.ActiveMQInitialContextFactory
</prop>
<prop key="java.naming.provider.url">tcp://localhost:61616</prop>
<prop key="java.naming.security.principal">system</prop>
<prop key="java.naming.security.credentials">manager</prop>
</props>
</property>
</bean>
<bean id="jndiTopicConnFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiTemplate" ref="jndiTemplate"/>
<!-- JNDI name of connection factory as defined by provider -->
<property name="jndiName" value="TopicCF"/>
</bean>
<bean id="topicConnFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jndiTopicConnFactory"/>
<!-- Number of sessions that will be cached -->
<property name="sessionCacheSize" value="1"/>
</bean>
<bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
<!-- do not create a dynamic destination if the destination name is not found in JNDI -->
<property name="fallbackToDynamicDestination" value="false"/>
</bean>
<bean id="messageListener" class="com.merc.springjmspubsublenderborrower.TBorrower"/>
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="topicConnFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="concurrentConsumers" value="3" />
<property name="destinationName" value="RateTopic"/>
<property name="messageListener" ref="messageListener" />
<property name="sessionAcknowledgeModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
My subscriber implements MessageListener
@Override
public void onMessage(Message message) {
try {
// Get the data from the message
BytesMessage msg = (BytesMessage) message;
double newRate = msg.readDouble();
// If the rate is at least 1 point lower than the current rate, then
//recommend refinancing
if ((currentRate - newRate) >= 1.0) {
System.out.println(
"New rate = " + newRate + " - Consider refinancing loan");
} else {
System.out.println("New rate = " + newRate + " - Keep existing loan");
}
System.out.println("\nWaiting for rate updates...");
} catch (Exception ex) {
ex.printStackTrace(System.out);
System.exit(1);
}
}
public static void main(String argv[]) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("app-context.xml");
try {
// Run until enter is pressed
BufferedReader stdin = new BufferedReader(new InputStreamReader(System.in));
System.out.println("TBorrower application started");
System.out.println("Press enter to quit application");
stdin.readLine();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
As we want to send a message to a topic we need to update our SenderConfig configuration. Use the setPubSubDomain() method on the JmsTemplate to set pubSubDomain to true . If you are using the autoconfigured JmsTemplate you can change the JMS domain by setting the spring. jms.
The JMS Listener adapter is a JMS (Java Message Service) client which provides the ability to perform Active Sync processing on messages from a JMS-compliant messaging system queue or topic. This adapter is a source-only adapter; it cannot write messages back to a queue or topic.
When you publish a message it goes to all the subscribers who are interested. Let’s build an example where we send a JMS message to a topic using Spring JMS. On the topic, we will register two subscribers who will each receive the message. We start from a previous Spring JMS sample application.
If you want to learn more about Spring JMS - head on over to the Spring JMS tutorials page. 1. What is a Topic? A JMS topic is a destination that delivers messages to multiple subscribers. It implements publish and subscribe semantics.
It implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested. Let’s build an example where we send a JMS message to a topic using Spring JMS.
Spring’s JmsTemplate can receive messages directly through its receive method, but that works only synchronously, meaning that it blocks.
You're trying to consume from a topic, but you haven't set the pubSubDomain
property on the DefaultMessageListenerContainer
, and it defaults to "false", meaning point-to-point, meaning a queue instead of a topic. Thus the error message telling you that RateTopic
isn't a javax.jms.Queue
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With