Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronously populating a Java Map and returning it as a future

I've got a map of objects which are expensive to create, and so I want to create the objects and fill the map in parallel with other processes in my application. Only when the main thread actually needs to access the map should the application wait for the asynchronous task(s) populating the map to finish. How can I most elegantly accomplish this?

Current approach

Currently, I am able to create each individual object in the map itself asynchronously using CompletableFuture.runAsync(Runnable, Executor) analogously to in the example code below, but I'm unsure of how I can build a Future/CompletableFuture-type mechanism for returning the Map itself when ready:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }

    public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
            incrementingJobs.add(incrementingJob);
        }
        // TODO: This blocks until the training is done; Instead, return a
        // future to the caller somehow
        CompletableFuture.allOf(incrementingJobs.build().toArray(CompletableFuture[]::new)).join();
        return result;
    }
}

However, with the code above, when code calls AsynchronousTest.create(Map<String,Integer), it already blocks until the method returns the entirely-populated ConcurrentMap<String,Integer>; How can I turn this into something like a Future<Map<String,Integer>> so that I can use it at a later time?:

Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
...
// Do lots of other stuff
...
Map<String,Integer> completedModels = futureClassModels.get();
like image 424
errantlinguist Avatar asked Nov 07 '17 11:11

errantlinguist


People also ask

What is asynchronous programming in Java 8?

Asynchronous Programming in JAVA 8 and its Implementation- Completable Future Asynchronous programming is a means of parallel programming in which a unit of work runs separately from the main application thread and notifies the calling thread of its completion, failure, or progress.

How to implement async in Java?

The first way to implement async in Java is to use the Runnable interface and Thread class which is found from JDK 1.0. Any class can implement Runnable and override the run () method or can extend Thread and do the same.

What is the difference between parallel and asynchronous program in Java?

Asynchronous program: It also utilizes threads to complete the tasks in lesser time like the parallel programs. But, there is a difference as it does not wait for the tasks list to complete, tasks are getting done asynchronously and the program is busy doing other stuff. Java 1 had threads to do parallel programming.

What is future in Java?

Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. In simple terms, a future is promise to hold the result of some operation once that operation completes. Future was introduced in Java 5.


1 Answers

As @Holger states in his comment, you must avoid calling .join() and rely upon thenApply() instead, e.g. like this:

public static class AsynchronousMapPopulator {

    private final Executor backgroundJobExecutor;

    public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
        this.backgroundJobExecutor = backgroundJobExecutor;
    }

    public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
        final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
        final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
        for (final Entry<String, Integer> entry : input.entrySet()) {
            final String className = entry.getKey();
            final Integer oldValue = entry.getValue();
            final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
                result.put(className, oldValue + 1);
            }, backgroundJobExecutor);
            incrementingJobs.add(incrementingJob);
        }
        // using thenApply instead of join here:
        return CompletableFuture.allOf(
                incrementingJobs.build().toArray(
                    CompletableFuture[]::new
                )
            ).thenApply(x -> result);
    }
}
like image 83
Lars Gendner Avatar answered Oct 19 '22 02:10

Lars Gendner