Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring testing @async method

I'm trying to test if @Async annotation of Spring is working as expected on my project. But It doesn't.

I have this test:

 @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(classes = GlobalConfiguration.class)
    public class ActivityMessageListenerTest {

    @Autowired
    private ActivityMessageListener activityMessageListener;

    private Long USER_ID = 1l;
    private Long COMPANY_ID = 2l;
    private Date DATE = new Date(10000000);
    private String CLASSNAME = "className";
    private Long CLASSPK = 14l;
    private Integer TYPE = 22;
    private String EXTRA_DATA = "extra";
    private Long RECIVED_USER_ID = 99l;

    @Before
    public void setup() throws Exception {
    }

    @Test
    public void testDoReceiveWithException() throws Exception {
        System.out.println("Current thread " +      Thread.currentThread().getName());
        Map<String, Object> values = new HashMap();
        values.put(ActivityMessageListener.PARAM_USER_ID, USER_ID);
        values.put(ActivityMessageListener.PARAM_COMPANY_ID, COMPANY_ID);
        values.put(ActivityMessageListener.PARAM_CREATE_DATE, DATE);
        values.put(ActivityMessageListener.PARAM_CLASS_NAME, CLASSNAME);
        values.put(ActivityMessageListener.PARAM_CLASS_PK, CLASSPK);
        values.put(ActivityMessageListener.PARAM_TYPE, TYPE);
        values.put(ActivityMessageListener.PARAM_EXTRA_DATA, EXTRA_DATA );
        values.put(ActivityMessageListener.PARAM_RECEIVED_USER_ID, RECIVED_USER_ID);

        Message message = new Message();
        message.setValues(values);
        MessageBusUtil.sendMessage(MKTDestinationNames.ACTIVITY_REGISTRY,      message);

    }
}

As you can see I'm printing the name of the current thread. The class containing the @Async method is:

 public class ActivityMessageListener extends BaseMessageListener {

    public static final String PARAM_USER_ID                = "userId";
    public static final String PARAM_COMPANY_ID             = "companyId";
    public static final String PARAM_CREATE_DATE            = "createDate";
    public static final String PARAM_CLASS_NAME             = "className";
    public static final String PARAM_CLASS_PK               = "classPK";
    public static final String PARAM_TYPE                   = "type";
    public static final String PARAM_EXTRA_DATA             = "extraData";
    public static final String PARAM_RECEIVED_USER_ID       = "receiverUserId";

    public ActivityMessageListener() {
        MessageBusUtil.addQueue(MKTDestinationNames.ACTIVITY_REGISTRY, this);
    }

    @Override
    @Async(value = "activityExecutor")
    public void doReceive(Message message) throws Exception {

        System.out.println("Current " + Thread.currentThread().getName());

        if (1> 0)
            throw new RuntimeException("lalal");
        Map<String, Object> parameters  = message.getValues();
        Long userId                     = (Long)parameters.get(ActivityMessageListener.PARAM_USER_ID);
        Long companyId                  = (Long)parameters.get(ActivityMessageListener.PARAM_COMPANY_ID);
        Date createDate                 = (Date)parameters.get(ActivityMessageListener.PARAM_CREATE_DATE);
        String className                = (String)parameters.get(ActivityMessageListener.PARAM_CLASS_NAME);
        Long classPK                    = (Long)parameters.get(ActivityMessageListener.PARAM_CLASS_PK);
        Integer type                    = (Integer)parameters.get(ActivityMessageListener.PARAM_TYPE);
        String extraData                = (String)parameters.get(ActivityMessageListener.PARAM_EXTRA_DATA);
        Long receiverUserId             = (Long)parameters.get(ActivityMessageListener.PARAM_RECEIVED_USER_ID);
        ActivityLocalServiceUtil.addActivity(userId, companyId, createDate, className, classPK, type, extraData, receiverUserId);
    }

}

Here I'm printing the name of the current thread inside of the @Async method, and the name is the same as before, main. So it's not working.

The GlobalConfiguration is:

@Configuration
@EnableAspectJAutoProxy
@EnableTransactionManagement
@ComponentScan({
        "com.shn.configurations",
...some packages...
})
public class GlobalConfiguration {...}

And inside one of the specified packages has the activityExecutor bean:

@Configuration
@EnableAsync(proxyTargetClass = true)
public class ExecutorConfiguration {

    @Bean
    public ActivityMessageListener activityMessageListener() {
        return new ActivityMessageListener();
    }

    @Bean
    public TaskExecutor activityExecutor()
    {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = 
        new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(10);
        threadPoolTaskExecutor.setMaxPoolSize(10);
        threadPoolTaskExecutor.setQueueCapacity(100);

        return threadPoolTaskExecutor;
    }
}

What I'm doing wrong?

like image 952
ilopezluna Avatar asked Feb 02 '15 22:02

ilopezluna


People also ask

How does @async work in Spring?

Simply put, annotating a method of a bean with @Async will make it execute in a separate thread. In other words, the caller will not wait for the completion of the called method. One interesting aspect in Spring is that the event support in the framework also has support for async processing if necessary.

Can we use @async on private method?

Never use @Async on top of a private method. In runtime, it will not able to create a proxy and, therefore, not work.


1 Answers

Tricky.

Asynchronous behavior is added through proxying.

Spring provides you with a proxy that wraps the actual object and performs the actual invocation in a separate thread.

It looks something like this (except most of this is done dynamically with CGLIB or JDK proxies and Spring handlers)

class ProxyListener extends ActivityMessageListener {
    private ActivityMessageListener real;
    public ProxyListener(ActivityMessageListener real) {
        this.real = real;
    }
    TaskExecutor executor; // injected
    @Override
    public void doReceive(Message message) throws Exception {
        executor.submit(() -> real.doReceive(message)); // in another thread
    }
}

ActivityMessageListener real = new ActivityMessageListener();
ProxyListener proxy = new ProxyListener(real);

Now, in a Spring world, you'd have a reference to the proxy object, not to the ActivityMessageListener. That is

ActivityMessageListener proxy = applicationContext.getBean(ActivityMessageListener.class);

would return a reference to the ProxyListener. Then, through polymorphism, invoking doReceive would go to the overriden Proxy#doReceive method which would invoke ActivityMessageListener#doReceive through delegation and you'd get your asynchronous behavior.

However, you're in a half Spring world.

Here

public ActivityMessageListener() {
    MessageBusUtil.addQueue(MKTDestinationNames.ACTIVITY_REGISTRY, this);
}

the reference this is actually referring to the real ActivityMessageListener, not to the proxy. So when, presumably, you send your message on the bus here

MessageBusUtil.sendMessage(MKTDestinationNames.ACTIVITY_REGISTRY,      message);

you're sending it to the real object, which doesn't have the proxy asynchronous behavior.

The full Spring solution would be to have the MessabeBus (and/or its queue) be Spring beans in which you can inject the fully process (proxied, autowired, initialized) beans.


In reality, since CGLIB proxies are really just subclasses of your types, so the ProxyListener above would actually also add itself to the bus since the super constructor would be invoked. It would seem though that only one MessageListener can register itself with a key, like MKTDestinationNames.ACTIVITY_REGISTRY. If this isn't the case, you'd have to show more of that code for explanation.


In your test, if you do

activityMessageListener.doReceive(message);

you should see that asynchronous behavior since activityMessageListener should hold a reference to the proxy.

like image 57
Sotirios Delimanolis Avatar answered Oct 13 '22 02:10

Sotirios Delimanolis