I'm familiar with functional programming languages, usually in Scala and Javascript. I'm working on a Java8 project and not sure how I am supposed to run through a list/stream of item, and perform some side-effect for each of them in parallel, using a custom thread pool, and return an object on which it's possible to listen for completion (wether it's a success or failure).
Currently I have the following code, it seems to work (I'm using Play framework Promise implementation as return) but it seems not ideal because ForkJoinPool is not meant to be used for IO intensive computations in the first place.
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
ForkJoinTask<F.Promise<Void>> result = pool
.submit(() -> {
try {
items.parallel().forEach(performSingleItemBackup);
return F.Promise.<Void>pure(null);
} catch (Exception e) {
return F.Promise.<Void>throwing(e);
}
});
try {
return result.get();
} catch (Exception e) {
throw new RuntimeException("Unable to get result", e);
}
}
Can someone give me a more idiomatic implementation of the above function? Ideally not using the ForkJoinPool, using a more standard return type, and most recent Java8 APIs? Not sure what I'm supposed to use between CompletableFuture, CompletionStage, ForkJoinTask...
Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems.
Creating Parallel StreamsStream stream = Stream. of("John", "Mike", "Ryan","Donald", "Matthew"); Stream parallelStream = stream. parallel(); Another way to create parallelstream is using parallelStream() method.
One of the prominent features of Java 8 (or higher) is Java Parallel Stream. It is meant for utilizing the various cores of the processor. Usually, any Java code that has only one processing stream, where it is sequentially executed.
A canonical solution would be
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ForkJoinPool pool = new ForkJoinPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
Note that the interaction between ForkJoin pool and parallel streams is an unspecified implementation detail you should not rely on. In contrast, CompletableFuture
provides a dedicated API for providing an Executor
. It doesn’t even have to be a ForkJoinPool
:
public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new));
} finally {
pool.shutdown();
}
}
In either case, you should shut down the executor explicitly instead of relying on automatic cleanup.
If you need a F.Promise<Void>
result, you can use
public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
}
But note that this, like your original code, only returns when the operation has been completed, while the methods returning a CompletableFuture
allow the operations to run asynchronously until the caller invokes join
or get
.
To return a truly asynchronous Promise
, you have to wrap the entire operation, e.g.
public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
return F.Promise.pure(stream).flatMap(items -> {
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
return CompletableFuture.allOf(
items.map(CompletableFuture::completedFuture)
.map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
.toArray(CompletableFuture<?>[]::new))
.handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
.join();
} finally {
pool.shutdown();
}
});
}
But it’s better to decide for one API instead of jumping back and forth between two different APIs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With