Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to deal with multiple ListenableFutures? (Spring)

I am writing a controller, that I need to make it asynchronous. How can I deal with a list of ListenableFuture? Because I have a list of URLs that I need to send GET request one by one, what is the best solution for it?

@RequestMapping(value = "/repositories", method = RequestMethod.GET)
    private void getUsername(@RequestParam(value = "username") String username) {
        System.out.println(username);
        List<ListenableFuture> futureList = githubRestAsync.getRepositoryLanguages(username);
        System.out.println(futureList.size());
}

In the service I use List<ListanbleFuture> which seems does not work, since it is asynchronous, in the controller method I cannot have the size of futureList to run a for loop on it for the callbacks.

public List<ListenableFuture> getRepositoryLanguages(String username){
      return getRepositoryLanguages(username, getUserRepositoriesFuture(username));
    }

private ListenableFuture getUserRepositoriesFuture(String username) throws HttpClientErrorException {
        HttpEntity entity = new HttpEntity(httpHeaders);
        ListenableFuture future = restTemplate.exchange(githubUsersUrl + username + "/repos", HttpMethod.GET, entity, String.class);
        return future;
    }
private List<ListenableFuture> getRepositoryLanguages(final String username, ListenableFuture<ResponseEntity<String>> future) {
        final List<ListenableFuture> futures = new ArrayList<>();
        future.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
            @Override
            public void onSuccess(ResponseEntity<String> response) {
                ObjectMapper mapper = new ObjectMapper();
                try {
                    repositories = mapper.readValue(response.getBody(), new TypeReference<List<Repositories>>() {
                    });
                    HttpEntity entity = new HttpEntity(httpHeaders);
                    System.out.println("Repo size: " + repositories.size());
                    for (int i = 0; i < repositories.size(); i++) {
                        futures.add(restTemplate.exchange(githubReposUrl + username + "/" + repositories.get(i).getName() + "/languages", HttpMethod.GET, entity, String.class));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("FAILURE in getRepositoryLanguages: " + throwable.getMessage());
            }
        });

        return futures;
    }

Should I use something like ListenableFuture<List> instead of List<ListenableFuture> ?

like image 339
secret Avatar asked Apr 03 '16 15:04

secret


2 Answers

It seems like you have a List<ListenableFuture<Result>>, but you want a ListenableFuture<List<Result>>, so you can take one action when all of the futures are complete.

public static <T> ListenableFuture<List<T>> allOf(final List<? extends ListenableFuture<? extends T>> futures) {
    // we will return this ListenableFuture, and modify it from within callbacks on each input future
    final SettableListenableFuture<List<T>> groupFuture = new SettableListenableFuture<>();

    // use a defensive shallow copy of the futures list, to avoid errors that could be caused by
    // someone inserting/removing a future from `futures` list after they call this method
    final List<? extends ListenableFuture<? extends T>> futuresCopy = new ArrayList<>(futures);

    // Count the number of completed futures with an AtomicInt (to avoid race conditions)
    final AtomicInteger resultCount = new AtomicInteger(0);
    for (int i = 0; i < futuresCopy.size(); i++) {
        futuresCopy.get(i).addCallback(new ListenableFutureCallback<T>() {
            @Override
            public void onSuccess(final T result) {
                int thisCount = resultCount.incrementAndGet();

                // if this is the last result, build the ArrayList and complete the GroupFuture
                if (thisCount == futuresCopy.size()) {
                   List<T> resultList = new ArrayList<T>(futuresCopy.size());
                    try {
                        for (ListenableFuture<? extends T> future : futuresCopy) {
                            resultList.add(future.get());
                        }
                        groupFuture.set(resultList);
                    } catch (Exception e) {
                        // this should never happen, but future.get() forces us to deal with this exception.
                        groupFuture.setException(e);
                    }
                }
            }

            @Override
            public void onFailure(final Throwable throwable) {
                groupFuture.setException(throwable);

                // if one future fails, don't waste effort on the others
                for (ListenableFuture future : futuresCopy) {
                    future.cancel(true);
                }
            }
        });
    }

    return groupFuture;
}
like image 183
Andrew Rueckert Avatar answered Sep 29 '22 09:09

Andrew Rueckert


Im not quite sure if you are starting a new project or working on a legacy one, but if the main requirement for you is none blocking and asynchronous rest service I would suggest you to have a look into upcoming Spring Framework 5 and it integration with reactive streams. Particularly Spring 5 will allow you to create fully reactive and asynchronous web services with little of coding.

So for example fully functional version of your code can be written with this small code snippet.

@RestController
public class ReactiveController {
    @GetMapping(value = "/repositories")
    public Flux<String> getUsername(@RequestParam(value = "username") String username) {
        WebClient client = WebClient.create(new ReactorClientHttpConnector());
        ClientRequest<Void> listRepoRequest = ClientRequest.GET("https://api.github.com/users/{username}/repos", username)
            .accept(MediaType.APPLICATION_JSON).header("user-agent", "reactive.java").build();

        return client.exchange(listRepoRequest).flatMap(response -> response.bodyToFlux(Repository.class)).flatMap(
            repository -> client
                    .exchange(ClientRequest
                            .GET("https://api.github.com/repos/{username}/{repo}/languages", username,
                                    repository.getName())
                            .accept(MediaType.APPLICATION_JSON).header("user-agent", "reactive.java").build())
                    .map(r -> r.bodyToMono(String.class)))
            .concatMap(Flux::merge);
    }

    static class Repository {
        private String name;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

}

To run this code locally just clone the spring-boot-starter-web-reactive and copy the code into it.

The result is something like {"Java":50563,"JavaScript":11541,"CSS":1177}{"Java":50469}{"Java":130182}{"Shell":21222,"Makefile":7169,"JavaScript":1156}{"Java":30754,"Shell":7058,"JavaScript":5486,"Batchfile":5006,"HTML":4865} still you can map it to something more usable in asynchronous way :)

like image 25
Babl Avatar answered Sep 29 '22 11:09

Babl