Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Re-queue rabbit AMQP messages at the tail of the queue after max attempts

I'm currently using RabbitMQ with spring (spring-rabbit-1.2.0-RELEASE), with the configuration below :

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- Asynchronous exchanges -->
<!-- Admin -->
<rabbit:admin connection-factory="connectionFactory"/>

<!-- Error Handler -->
<bean id="biErrorHandler" class="my.project.sync.BiErrorHandler" />
<!-- Message converter -->
<bean id="biMessageConverter" class="my.project.sync.BiMessageConverter"/>

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
    <property name="messageKeyGenerator" ref="biKeyGenerator" />
</bean> 

<bean id="biKeyGenerator" class="my.project.sync.BiMessageKeyGenerator"/>
<bean id="rejectAndDontRequeueRecoverer" class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="3000" />
            <property name="maxInterval" value="30000" />
        </bean>     
    </property>
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="3" />
        </bean>     
    </property>
</bean>  

 <rabbit:queue id="biSynchronizationQueue" name="BI_SYNCHRONIZATION_QUEUE" durable="true" />
<rabbit:listener-container message-converter="biMessageConverter" concurrency="1" 
    connection-factory="connectionFactory"
    error-handler="biErrorHandler"
    advice-chain="retryInterceptor"
    acknowledge="auto">
    <rabbit:listener queues="BI_SYNCHRONIZATION_QUEUE" ref="biSynchronizationService" method="handleMessage"/>
</rabbit:listener-container>

I would like to requeue the message at the tail of the queue after the third attempt. But i don't find a way to perform that.

Is someone has an idea?

Thanks in advance for your help.

like image 507
Denis Cucchietti Avatar asked Dec 25 '22 14:12

Denis Cucchietti


2 Answers

Instead of using the RejectAndDontRequeueRecoverer, use a custom MessageRecoverer that uses a RabbitTemplate to send the message to the back of the queue; then throw an AmqpRejectAndDontRequeueException so the message will be rejected.

You could subclass the RejectAndDontRequeueRecoverer, send the message in recover() and then call super.recover() (which simply throws the exception). Or just do all the implementation in your own recover().

like image 195
Gary Russell Avatar answered Jan 05 '23 00:01

Gary Russell


I used this tip, but then went a slightly different direction by using a deadletter queue. For me, it was more explicit than putting the message at the end of the queue.

 <rabbit:queue name="content.variantchange.queue" durable="true" queue-arguments="queueArguments"/>

<util:map id="queueArguments">
    <entry key="x-dead-letter-exchange" value="content.deadletter.topic"/>
</util:map>

  <rabbit:fanout-exchange name="content.deadletter.topic">
    <rabbit:bindings>
        <rabbit:binding queue="content.deadletter.queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<rabbit:queue name="content.deadletter.queue" durable="true"/>

Once you have the dead letter queue in place, you can do whatever you want via another consumer (including adding it back to the original queue.)

I also ended up using the stateless version of the interceptor Gary recommended which allowed me to not have to worry about a message id generator.

<bean id="retryInterceptor" class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
    <property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer"/>
    <property name="retryOperations" ref="retryTemplate" />
</bean>
like image 32
Ryan Walls Avatar answered Jan 05 '23 01:01

Ryan Walls