I've got a map of objects which are expensive to create, and so I want to create the objects and fill the map in parallel with other processes in my application. Only when the main thread actually needs to access the map should the application wait for the asynchronous task(s) populating the map to finish. How can I most elegantly accomplish this?
Currently, I am able to create each individual object in the map itself asynchronously using CompletableFuture.runAsync(Runnable, Executor)
analogously to in the example code below, but I'm unsure of how I can build a Future
/CompletableFuture
-type mechanism for returning the Map
itself when ready:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public ConcurrentMap<String, Integer> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, backgroundJobExecutor);
incrementingJobs.add(incrementingJob);
}
// TODO: This blocks until the training is done; Instead, return a
// future to the caller somehow
CompletableFuture.allOf(incrementingJobs.build().toArray(CompletableFuture[]::new)).join();
return result;
}
}
However, with the code above, when code calls AsynchronousTest.create(Map<String,Integer)
, it already blocks until the method returns the entirely-populated ConcurrentMap<String,Integer>
; How can I turn this into something like a Future<Map<String,Integer>>
so that I can use it at a later time?:
Executor someExecutor = ForkJoinPool.commonPool();
Future<Map<String,Integer>> futureClassModels = new AsynchronousMapPopulator(someExecutor).apply(wordClassObservations);
...
// Do lots of other stuff
...
Map<String,Integer> completedModels = futureClassModels.get();
Asynchronous Programming in JAVA 8 and its Implementation- Completable Future Asynchronous programming is a means of parallel programming in which a unit of work runs separately from the main application thread and notifies the calling thread of its completion, failure, or progress.
The first way to implement async in Java is to use the Runnable interface and Thread class which is found from JDK 1.0. Any class can implement Runnable and override the run () method or can extend Thread and do the same.
Asynchronous program: It also utilizes threads to complete the tasks in lesser time like the parallel programs. But, there is a difference as it does not wait for the tasks list to complete, tasks are getting done asynchronously and the program is busy doing other stuff. Java 1 had threads to do parallel programming.
Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. In simple terms, a future is promise to hold the result of some operation once that operation completes. Future was introduced in Java 5.
As @Holger states in his comment, you must avoid calling .join()
and rely upon thenApply()
instead, e.g. like this:
public static class AsynchronousMapPopulator {
private final Executor backgroundJobExecutor;
public AsynchronousMapPopulator(final Executor backgroundJobExecutor) {
this.backgroundJobExecutor = backgroundJobExecutor;
}
public Future<Map<String, Integer>> apply(final Map<String,Integer> input) {
final ConcurrentMap<String, Integer> result = new ConcurrentHashMap<>(input.size());
final Stream.Builder<CompletableFuture<Void>> incrementingJobs = Stream.builder();
for (final Entry<String, Integer> entry : input.entrySet()) {
final String className = entry.getKey();
final Integer oldValue = entry.getValue();
final CompletableFuture<Void> incrementingJob = CompletableFuture.runAsync(() -> {
result.put(className, oldValue + 1);
}, backgroundJobExecutor);
incrementingJobs.add(incrementingJob);
}
// using thenApply instead of join here:
return CompletableFuture.allOf(
incrementingJobs.build().toArray(
CompletableFuture[]::new
)
).thenApply(x -> result);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With