Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture for child requests

I am trying to understand CompletableFuture in Java 8. As a part of it, I am trying to make some REST calls to solidify my understanding. I am using this library to make REST calls: https://github.com/AsyncHttpClient/async-http-client.

Please note, this library returns a Response object for the GET call.

Following is what I am trying to do:

  1. Call this URL which gives the list of users: https://jsonplaceholder.typicode.com/users
  2. Convert the Response to List of User Objects using GSON.
  3. Iterate over each User object in the list, get the userID and then get the list of Posts made by the user from the following URL: https://jsonplaceholder.typicode.com/posts?userId=1
  4. Convert each post response to Post Object using GSON.
  5. Build a Collection of UserPost objects, each of which has a User Object and a list of posts made by the user.

    public class UserPosts {
    
    private final User user;
    private final List<Post> posts;
    
    public UserPosts(User user, List<Post> posts) {
        this.user = user;
        this.posts = posts;
    }
    
    @Override
    public String toString() {
        return "user = " + this.user + " \n" + "post = " + posts+ " \n \n";
    }
    

    }

I currently have it implemented as follows:

package com.CompletableFuture;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.asynchttpclient.Response;

import com.http.HttpResponse;
import com.http.HttpUtil;
import com.model.Post;
import com.model.User;
import com.model.UserPosts;

/**
 * Created by vm on 8/20/18.
 */

class UserPostResponse {
    private final User user;
    private final Future<Response> postResponse;

    UserPostResponse(User user, Future<Response> postResponse) {
        this.user = user;
        this.postResponse = postResponse;
    }

    public User getUser() {
        return user;
    }

    public Future<Response> getPostResponse() {
        return postResponse;
    }
}

public class HttpCompletableFuture extends HttpResponse {
    private Function<Future<Response>, List<User>> userResponseToObject = user -> {
        try {
            return super.convertResponseToUser(Optional.of(user.get().getResponseBody())).get();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    };
    private Function<Future<Response>, List<Post>> postResponseToObject = post -> {
        try {
            return super.convertResponseToPost(Optional.of(post.get().getResponseBody())).get();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    };

    private Function<UserPostResponse, UserPosts> buildUserPosts = (userPostResponse) -> {
        try {
            return new UserPosts(userPostResponse.getUser(), postResponseToObject.apply(userPostResponse.getPostResponse()));
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    };

    private Function<User, UserPostResponse> getPostResponseForUser = user -> {
        Future<Response> resp = super.getPostsForUser(user.getId());
        return new UserPostResponse(user, resp);
    };

    public HttpCompletableFuture() {
        super(HttpUtil.getInstance());
    }

    public List<UserPosts> getUserPosts() {

        try {
            CompletableFuture<List<UserPosts>> usersFuture = CompletableFuture
                    .supplyAsync(() -> super.getUsers())
                    .thenApply(userResponseToObject)
                    .thenApply((List<User> users)-> users.stream().map(getPostResponseForUser).collect(Collectors.toList()))
                    .thenApply((List<UserPostResponse> userPostResponses ) -> userPostResponses.stream().map(buildUserPosts).collect(Collectors.toList()));

            List<UserPosts> users = usersFuture.get();
            System.out.println(users);
            return users;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

}

However, I am not sure if the way I am doing this is right. More specifically, in userResponseToObject and postResponseToObject Functions, I am calling the get() method on the Future, which will be blocking.

Is there a better way to implement this?

like image 487
Vinod Mohanan Avatar asked Oct 17 '22 14:10

Vinod Mohanan


1 Answers

If you plan to use CompletableFuture, you should use the ListenableFuture from async-http-client library. ListenableFuture can be converted to CompletableFuture.

The advantage of using CompletableFuture is that you can write logic that deals with Response object without having to know anything about futures or threads. Suppose you wrote the following 4 methods. 2 to make requests and 2 to parse responses:

ListenableFuture<Response> requestUsers() {

}

ListenableFuture<Response> requestPosts(User u) {

}

List<User> parseUsers(Response r) {

}

List<UserPost> parseUserPosts(Response r, User u) {

}

Now we can write a non-blocking method that retrieves posts for a given user:

CompletableFuture<List<UserPost>> userPosts(User u) {
    return requestPosts(u)
        .toCompletableFuture()
        .thenApply(r -> parseUserPosts(r, u));
}

and a blocking method to read all posts for all users:

List<UserPost> getAllPosts() {
    // issue all requests
    List<CompletableFuture<List<UserPost>>> postFutures = requestUsers()
            .toCompletableFuture()
            .thenApply(userRequest -> parseUsers(userRequest)
                    .stream()
                    .map(this::userPosts)
                    .collect(toList())
            ).join();


    // collect the results
    return postFutures.stream()
            .map(CompletableFuture::join)
            .flatMap(List::stream)
            .collect(toList());
}
like image 53
Misha Avatar answered Nov 15 '22 07:11

Misha