I'm using spring integration and its support for MQTT; I saw the spring integration documentation and my simple test case is to publish a message on a MQTT topic. The Spring documentation is located here: http://docs.spring.io/spring-integration/reference/html/mqtt.html#_configuring_with_java_configuration_15
I'm using these versions:
I built this simple configuration class:
@Configuration
@IntegrationComponentScan
public class CommunicationServerApplication
{
@Bean
public MqttPahoClientFactory mqttClientFactory()
{
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttServerUris);
if (StringUtils.hasText(mqttUsername) && StringUtils.hasText(mqttPassword))
{
factory.setUserName(mqttUsername);
factory.setPassword(mqttPassword);
}
factory.setConnectionTimeout(mqttConnectionTimeout);
factory.setKeepAliveInterval(mqttKeepAliveInterval);
factory.setPersistence(new MqttDefaultFilePersistence(mqttPersistenceFileDirectory));
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel", autoStartup="true")
public MessageHandler mqttOutbound()
{
String clientId = mqttClientId;
if( !StringUtils.hasText(clientId) )
{
clientId = UUID.randomUUID().toString();
}
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttTopic);
if( mqttQos >= 0 && mqttQos <=2 )
{
messageHandler.setDefaultQos(mqttQos);
}
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel()
{
DirectChannel dc = new DirectChannel();
return dc;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMsgproducer
{
void sendToMqtt(String data);
}
}
And then I used this simple test case:
@ContextConfiguration(value ={ "classpath:app-ctx.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class SimpleMqttTestSuite
{
private static final Logger logger = LoggerFactory.getLogger(SimpleMqttTestSuite.class.getName());
@Autowired
private MqttMsgproducer sender;
@Test
public void startServerTest()
{
try
{
sender.sendToMqtt("Hello");
}
catch (Exception e)
{
logger.error("Error", e);
}
}
}
My app-ctx.xml is:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:oxm="http://www.springframework.org/schema/oxm"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<context:component-scan base-package="it.olegna.test.integration" />
<context:property-placeholder location="classpath:configuration.properties"
order="0" ignore-resource-not-found="true" ignore-unresolvable="true" />
</beans>
Executing the simple test, I'm having this error:
2016-12-20 10:46:33,889 49967 [nioEventLoopGroup-3-1] ERROR - Errore
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.GenericApplicationContext@2e6a8155.mqttOutboundChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135) ~[spring-messaging-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) ~[spring-integration-core-4.3.5.RELEASE.jar:4.3.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.4.RELEASE.jar:4.3.4.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.4.RELEASE.jar:4.3.4.RELEASE]
I can't figure what I'm missing in the configuration. Can anybody give to me a tip?
thank you
Angelo
I solved my issue
It was related to the fact that I built the Channel but now handler have been subscribed
In my application class I did the following:
@Bean
public MessageChannel mqttOutboundChannel()
{
DirectChannel dc = new DirectChannel();
dc.subscribe(mqttOutbound());
return dc;
}
As you can see now I manually add subscribe the bean mqttOutbound (the message handler) to the Channel
By doing in this way all works
I hope this can help
Angelo
UPDATE AFTER Gary Russell ANSWER
As suggested by Gary Russell I didn't subscribe to the Channel
I added the annotation @EnableIntegration
So my Application class now is the following:
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class CommunicationServerApplication
{
@Bean
public MqttPahoClientFactory mqttClientFactory()
{
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttServerUris);
if (StringUtils.hasText(mqttUsername) && StringUtils.hasText(mqttPassword))
{
factory.setUserName(mqttUsername);
factory.setPassword(mqttPassword);
}
factory.setConnectionTimeout(mqttConnectionTimeout);
factory.setKeepAliveInterval(mqttKeepAliveInterval);
factory.setPersistence(new MqttDefaultFilePersistence(mqttPersistenceFileDirectory));
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel", autoStartup="true")
public MessageHandler mqttOutbound()
{
String clientId = mqttClientId;
if( !StringUtils.hasText(clientId) )
{
clientId = UUID.randomUUID().toString();
}
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttTopic);
if( mqttQos >= 0 && mqttQos <=2 )
{
messageHandler.setDefaultQos(mqttQos);
}
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel()
{
DirectChannel dc = new DirectChannel();
return dc;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttMsgproducer
{
void sendToMqtt(String data);
}
}
Your solution is incorrect - you must not subscribe within a channel bean definition. I believe your problem is that you are missing @EnableIntegration
on the class.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With