My question is really a follow up question to
RabbitMQ Integration Test and Threading
There it states to wrap "your listeners" and pass in a CountDownLatch and eventually all the threads will merge. This answer works if we were manually creating and injecting the message listener but for @RabbitListener annotations... i'm not sure how to pass in a CountDownLatch. The framework is auto magically creating the message listener behind the scenes.
Are there any other approaches?
With the help of @Gary Russell I was able to get an answer and used the following solution.
Conclusion: I must admit i'm indifferent about this solution (feels like a hack) but this is the only thing I could get to work and once you get over the initial one time setup and actually understand the 'work flow' it is not so painful. Basically comes down to defining ( 2 ) @Beans and adding them to your Integration Test config.
Example solution posted below with explanations. Please feel free to suggest improvements to this solution.
1. Define a ProxyListenerBPP that during spring initialization will listen for a specified clazz (i.e our test class that contains @RabbitListener) and inject our custom CountDownLatchListenerInterceptor advice defined in the next step.
import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;
/**
* Implements BeanPostProcessor bean... during spring initialization we will
* listen for a specified clazz
* (i.e our @RabbitListener annotated class) and
* inject our custom CountDownLatchListenerInterceptor advice
* @author sjacobs
*
*/
public class ProxyListenerBPP implements BeanPostProcessor, BeanFactoryAware, Ordered, PriorityOrdered{
private BeanFactory beanFactory;
private Class<?> clazz;
public static final String ADVICE_BEAN_NAME = "wasCalled";
public ProxyListenerBPP(Class<?> clazz) {
this.clazz = clazz;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (clazz.isAssignableFrom(bean.getClass())) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.setProxyTargetClass(true); // CGLIB, false for JDK proxy (interface needed)
pfb.setTarget(bean);
pfb.addAdvice(this.beanFactory.getBean(ADVICE_BEAN_NAME, Advice.class));
return pfb.getObject();
}
else {
return bean;
}
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 1000; // Just before @RabbitListener post processor
}
2. Create the MethodInterceptor advice impl that will hold the reference to the CountDownLatch. The CountDownLatch needs to be referenced in both in the Integration test thread and inside the async worker thread in the @RabbitListener. So we can later release back to the Integration Test thread as soon as the @RabbitListener async thread has completed execution. No need for polling.
import java.util.concurrent.CountDownLatch;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
/**
* AOP MethodInterceptor that maps a <b>Single</b> CountDownLatch to one method and invokes
* CountDownLatch.countDown() after the method has completed execution. The motivation behind this
* is for integration testing purposes of Spring RabbitMq Async Worker threads to be able to merge
* the Integration Test thread after an Async 'worker' thread completed its task.
* @author sjacobs
*
*/
public class CountDownLatchListenerInterceptor implements MethodInterceptor {
private CountDownLatch countDownLatch = new CountDownLatch(1);
private final String methodNameToInvokeCDL ;
public CountDownLatchListenerInterceptor(String methodName) {
this.methodNameToInvokeCDL = methodName;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
String methodName = invocation.getMethod().getName();
if (this.methodNameToInvokeCDL.equals(methodName) ) {
//invoke async work
Object result = invocation.proceed();
//returns us back to the 'awaiting' thread inside the integration test
this.countDownLatch.countDown();
//"reset" CountDownLatch for next @Test (if testing for more async worker)
this.countDownLatch = new CountDownLatch(1);
return result;
} else
return invocation.proceed();
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
}
3. Next add to your Integration Test Config the following @Bean(s)
public class SomeClassThatHasRabbitListenerAnnotationsITConfig extends BaseIntegrationTestConfig {
// pass into the constructor the test Clazz that contains the @RabbitListener annotation into the constructor
@Bean
public static ProxyListenerBPP listenerProxier() { // note static
return new ProxyListenerBPP(SomeClassThatHasRabbitListenerAnnotations.class);
}
// pass the method name that will be invoked by the async thread in SomeClassThatHasRabbitListenerAnnotations.Class
// I.E the method name annotated with @RabbitListener or @RabbitHandler
// in our example 'listen' is the method name inside SomeClassThatHasRabbitListenerAnnotations.Class
@Bean(name=ProxyListenerBPP.ADVICE_BEAN_NAME)
public static Advice wasCalled() {
String methodName = "listen";
return new CountDownLatchListenerInterceptor( methodName );
}
// this is the @RabbitListener bean we are testing
@Bean
public SomeClassThatHasRabbitListenerAnnotations rabbitListener() {
return new SomeClassThatHasRabbitListenerAnnotations();
}
}
4. Finally, in the integration @Test call... after sending a message via rabbitTemplate to trigger the async thread... now call the CountDownLatch#await(...) method obtained from the interceptor and make sure to pass in a TimeUnit args so it can timeout in case of long running process or something goes wrong. Once the async the Integration Test thread is notified (awakened) and now we can finally begin to actually test/validate/verify the results of the async work.
@ContextConfiguration(classes={ SomeClassThatHasRabbitListenerAnnotationsITConfig.class } )
public class SomeClassThatHasRabbitListenerAnnotationsIT extends BaseIntegrationTest{
@Inject
private CountDownLatchListenerInterceptor interceptor;
@Inject
private RabbitTemplate rabbitTemplate;
@Test
public void shouldReturnBackAfterAsyncThreadIsFinished() throws Exception {
MyObject payload = new MyObject();
rabbitTemplate.convertAndSend("some.defined.work.queue", payload);
CountDownLatch cdl = interceptor.getCountDownLatch();
// wait for async thread to finish
cdl.await(10, TimeUnit.SECONDS); // IMPORTANT: set timeout args.
//Begin the actual testing of the results of the async work
// check the database?
// download a msg from another queue?
// verify email was sent...
// etc...
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With