Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spring 4.0's new ListenableFuture With Callbacks - Odd Results

I have a web app that takes an array of IDs, queries an external webservice for each ID one at a time and publish each result as it arrives to a WebSocket client via a STOMP broker. I can get this to work using simple Futures, but I'm trying to use Spring 4's new ListenableFutures and provide a callback.

The working code uses a ThreadPoolTaskExecutor that is defined in my root config. I have a class called "SosQuery" with a method called "test" that is annotated with @Async and returns an AsyncResult. Here is my working code being called from a root context service class:

@Override
    public void test(String[] oids) throws Exception {
        List<Future<String>> futures = new ArrayList<Future<String>>();

        for (String oid : oids) {
            futures.add(sosQuery.test(oid));
        }

        while (!futures.isEmpty()) {
            List<Future<String>> done = new ArrayList<Future<String>>();
            for (Future<String> future : futures) {
                if (future.isDone()) {
                    messagingTemplate.convertAndSendToUser("me", "/queue/observation", future.get());
                    done.add(future);
                }
            }
            futures.removeAll(done);
        }
    }

This works fine and I see the responses arriving in my client. I modified the SosQuery method that is defined with the @Async annotation to simply return "String", and created a SimpleAsyncTaskExecutor in my root config. Here is the modified method to use ListenableFuture:

 @Override
    public void test(String[] oids) throws Exception {
        for (final String oid : oids) {
              ListenableFuture<String> task = asyncTaskExecutor.submitListenable(new Callable<String>(){
                @Override
                public String call() throws Exception {
                    String result = sosQuery.test(oid);
                    logger.debug("result for sosQuery: " + result);
                    return result;
                }
            });

            task.addCallback(new ListenableFutureCallback<String>() {

                @Override
                public void onSuccess(String result){
                    if (result == null){
                        result = "ITS NULL";
                    }
                    messagingTemplate.convertAndSendToUser("me", "/queue/observation", result);
                }

                @Override
                public void onFailure(Throwable t){
                    logger.error("Error executing callback.", t);
                }
            });
        }
    }

I'm seeing weird behavior... when I deploy in debug mode, I can see that the call() method is being executed and the result is being built from the SosQuery class properly, however my logger statement never appears in the logs. Immediately aftewards, the onSuccess method executes, but the result String is null.

The onFailure method never gets called and there is absolutely nothing distinctive in the logs. Documentation for using the ListableFutures is scarce and tightly coupled to the AsyncRestTemplate, but little exists for just creating your own tasks. Does anybody have any idea what I might be doing wrong?

like image 593
Bal Avatar asked May 15 '14 14:05

Bal


1 Answers

You should remove @Async in your SosQuery.test method.

ListenableFuture<String> task = asyncTaskExecutor.submitListenable(new Callable<String>(){
                @Override
                public String call() throws Exception {
                    String result = sosQuery.test(oid);
                    logger.debug("result for sosQuery: " + result);
                    return result;
                }
            });

Here the content inside the call() method is already invoked in separate thread . if you have @Async in test method. then it would create another thread and return immediately( that why you are getting response immediately before the test method completes)

And another important note from Doc **

    This implementation does not reuse threads! Consider a thread-pooling TaskExecutor 
implementation instead, in particular for executing a large number of short-lived tasks.
like image 83
Mani Avatar answered Sep 22 '22 10:09

Mani