Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Trying to migrate over from <rabbit:> xml name space config to a java @Configurable cannot replicate

I am trying to migrate from XML Spring amqp config to a java annotation based one because it is "simpler". Not sure what I am doing wrong the XML config works fine but the java @Configurable throws a "Caused by: java.net.SocketException: Connection reset" exception.

XML config (works perfectly):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.springframework.org/schema/util
           http://www.springframework.org/schema/util/spring-util.xsd
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- define which properties files will be used -->
    <context:property-placeholder location="classpath:*.properties" />

    <rabbit:connection-factory id="connectionFactory"
                               addresses='${rabbitmq.hostname}'
                               username='${rabbitmq.username}'
                               password='${rabbitmq.password}' 
                               virtual-host='${rabbitmq.virtual_host}'  
                               cache-mode='${rabbitmq.cache_mode}'                             
                               channel-cache-size='${rabbitmq.channel_cache_size}'/>

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="3"/>
        <property name="maxPoolSize" value="5"/>
        <property name="queueCapacity" value="15"/>                          
    </bean>                              


    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
    <rabbit:admin connection-factory="connectionFactory"/>
       <rabbit:queue name="${rabbitmq.queue_name}" />
<rabbit:topic-exchange name="${rabbitmq.topic_exchange_name}">
    <rabbit:bindings>
        <rabbit:binding queue="${rabbitmq.queue_name}" pattern="${rabbitmq.topic_exchange_pattern}"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

    <bean id="listener" class="com.my.package.path.worker.DefaultMessageListener"/>


    <rabbit:listener-container id="listenerContainer" connection-factory="connectionFactory" task-executor="taskExecutor">
            <rabbit:listener ref="listener" queues="notification.main" />

    </rabbit:listener-container>
</beans>

Java config:

@Configurable
@PropertySource("classpath:rabbitmq.properties")
public class RabbitMQConfig {

@Value("${rabbitmq.hostname}")
private String hostname;

@Value("${rabbitmq.port}")
private String port; 

@Value("${rabbitmq.username}")
private String username;

@Value("${rabbitmq.password}")
private String password; 

@Value("${rabbitmq.virtual_host}")
private String virtualHost; 

//@Value("${rabbitmq.cache_mode}")
//private String cacheMode;

@Value("${rabbitmq.channel_cache_size}")
private String channelCacheSize;

@Value("${rabbitmq.topic_exchange_name}")
private String topicExchangeName;

@Value("${rabbitmq.topic_exchange_pattern}")
private String topicExchangePattern;

@Value("${rabbitmq.queue_name}")
private String queueName; 

@Autowired
private ConnectionFactory cachingConnectionFactory;

@Bean(name="cachingConnectionFactory")
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port));

    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);

    //connectionFactory.setCacheMode(CacheMode.valueOf(cacheMode));
    connectionFactory.setChannelCacheSize(Integer.valueOf( channelCacheSize ));

    return connectionFactory;
}

@Bean(name="taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor tpte = new ThreadPoolTaskExecutor();
    tpte.setCorePoolSize(3);
    tpte.setMaxPoolSize(5);
    tpte.setQueueCapacity(15);
    return tpte;
}

@Bean
public AmqpTemplate AmqpTemplate() {
    RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);

    return template;
}


@Bean
public AmqpAdmin amqpAdmin() {
    RabbitAdmin amqpAdmin = new RabbitAdmin(cachingConnectionFactory);

    return amqpAdmin;
}

@Bean
public Queue queue() {
    return new Queue(queueName);
}

@Bean
public TopicExchange topicExchange() {
    TopicExchange topicExchange = new TopicExchange(topicExchangeName);
    return topicExchange;
}

@Bean
public Binding dataBinding(TopicExchange topicExchange, Queue queue) {
    return BindingBuilder.bind(queue).to(topicExchange).with(topicExchangePattern);
}

@Bean 
public DefaultMessageListener defaultMessageListener() {
    return new DefaultMessageListener();
}

@Bean 
public SimpleMessageListenerContainer container(DefaultMessageListener defaultMessageListener) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(cachingConnectionFactory);
    container.setQueueNames(queueName);
    container.setAutoStartup(true);
    container.setMessageListener(defaultMessageListener);
    //container.setTaskExecutor(taskExecutor);
    return container;
}

@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
   return new PropertySourcesPlaceholderConfigurer();
}

java config error:

INFO : org.springframework.context.support.DefaultLifecycleProcessor - Starting beans in phase 2147483647
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - No global properties bean
DEBUG: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Starting Rabbit listener container.
ERROR: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Failed to check/redeclare auto-delete queue(s).
org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:217)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
    at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:603)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:637)
    at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 16 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:534)
    ... 1 more

I stepped into the Spring amqp code and the culprit is the RabbitAdmin#getQueueProperties method. In the XML config it executes fine... but as soon as it executes with the java config it throws the exception above? What I am doing that is different? Both configs look the same to me.

package org.springframework.amqp.rabbit.core;

public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {
//... 
    @Override
    public Properties getQueueProperties(final String queueName) {
        Assert.hasText(queueName, "'queueName' cannot be null or empty");
        return this.rabbitTemplate.execute(new ChannelCallback<Properties>() {
            @Override
            public Properties doInRabbit(Channel channel) throws Exception {
                try {
                    DeclareOk declareOk = channel.queueDeclarePassive(queueName);
                    Properties props = new Properties();
                    props.put(QUEUE_NAME, declareOk.getQueue());
                    props.put(QUEUE_MESSAGE_COUNT, declareOk.getMessageCount());
                    props.put(QUEUE_CONSUMER_COUNT, declareOk.getConsumerCount());
                    return props;
                }
                catch (Exception e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Queue '" + queueName + "' does not exist");
                    }
                    return null;
                }
            }
        });
    }
}

Both configs use the exact same rabbitmq.properties file on the classpath. I am even checked the attributes of RabbitAdmin and RabbitTemplate classes at runtime for both configs and they look exactly the same...

like image 243
Selwyn Avatar asked May 19 '15 15:05

Selwyn


2 Answers

You should be using @Configuration, not @Configurable.

EDIT:

Looks like the rabbitmq-server is closing the connection:

Caused by: java.net.SocketException: Connection reset

Look in the server log; if that doesn't help; post a full DEBUG log for org.springframework somewhere (probably too big for here).

EDIT2:

You have an authentication problem...

{handshake_error,opening,0,
             {amqp_error,access_refused,
                         "access to vhost '/' refused for user 'gggdw'",
                         'connection.open'}}

...check your username and pasword (and vhost).

like image 195
Gary Russell Avatar answered Oct 29 '22 17:10

Gary Russell


I am not using the '/' root virtual host. I had my own custom value for virtual_host. Although, I did inject this property via spel into my java config I did not explicitly set it on the connectionFactory.

connectionFactory.setVirtualHost(virtualHost);

Thanks to @Gary Russell for helping me trouble shoot.

@Bean(name="cachingConnectionFactory")
public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(hostname,Integer.valueOf(port));

        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setChannelCacheSize(Integer.valueOf( channelCacheSize ));

        return connectionFactory;
    }
like image 34
Selwyn Avatar answered Oct 29 '22 16:10

Selwyn