Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do we hook into before/After message processing using @RabbitListener

Problem: I am migrating from MessageListener interface impl to @RabbitListener. I had logic like this where I was doing "pre" and "post" message processing on a MessageListener that was inherited by several classes

example:

public AbstractMessageListener implements MessageListener {

     @Override
     public void onMessage(Message message) {

          //do some pre message processing

          process(Message message);

          // do some post message processing
     }

     protected abstract void process(Message message);

}

Question: Is there a way I can achieve something similar using @RabbitListener annotation Where I can inherit pre/post message processing logic without having to re-implement or call the pre/post message processing inside each child @RabbitListener annotation and all the while maintaining a customizable method signatures for the child @RabbitListener? Or is this being too greedy?

Example desired result:

public class SomeRabbitListenerClass {

    @RabbitListener( id = "listener.mypojo",queues = "${rabbitmq.some.queue}")
   public void listen(@Valid MyPojo myPojo) {
      //...
   }
}

public class SomeOtherRabbitListenerClass {

    @RabbitListener(id = "listener.orders",queues ="${rabbitmq.some.other.queue}")
   public void listen(Order order, @Header("order_type") String orderType) {
      //...
   }
}

with both these @RabbitListener(s) utilizing the same inherited pre/post message processing

I see there is a 'containerFactory' argument in the @RabbitListener annotation but i'm already declaring one in the config... and i'm really sure how to achieve the inheritance I desire with a custom containerFactory.


Updated Answer: This is what I ended up doing.

Advice defintion:

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;

/**
 * AOP Around advice wrapper. Every time a message comes in we can do 
 * pre/post processing by using this advice by implementing the before/after methods.
 * @author sjacobs
 *
 */
public class RabbitListenerAroundAdvice implements MethodInterceptor {

    /**
     * place the "AroundAdvice" around each new message being processed.
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {

        Message message = (Message) invocation.getArguments()[1];

        before(message)
        Object result = invocation.proceed();
        after(message);

        return  result;
    }

declare beans: In your rabbitmq config declare the advice as a Spring bean and pass it to the rabbitListenerContainerFactory#setAdviceChain(...)

//...

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory( cachingConnectionFactory() );
        factory.setTaskExecutor(threadPoolTaskExecutor());
        factory.setMessageConverter(jackson2JsonMessageConverter());   

        factory.setAdviceChain(rabbitListenerAroundAdvice());

        return factory;
    }

    @Bean
    public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() {
        return new RabbitListenerAroundAdvice();
    }

// ...
like image 415
Selwyn Avatar asked Dec 29 '15 15:12

Selwyn


1 Answers

Correction

You can use the advice chain in the SimpleRabbitListenerContainerFactory to apply an around advice to listeners created for @RabbitListener; the two arguments are the Channel and Message.

If you only need to take action before calling the listener, you can add MessagePostProcessor(s) to the container afterReceivePostProcessors.

like image 98
Gary Russell Avatar answered Sep 17 '22 12:09

Gary Russell