I'm toying with Java8's streams and CompletableFuture
s. My pre-existing code has a class that takes a single URL and downloads it:
public class FileDownloader implements Runnable {
private URL target;
public FileDownloader(String target) {
this.target = new URL(target);
}
public void run() { /* do it */ }
}
Now, this class gets it's information from another part that emits List<String>
(a number of targets on a single host).
I've switched the surrounding code to CompletableFuture
:
public class Downloader {
public static void main(String[] args) {
List<String> hosts = fetchTargetHosts();
for (String host : hosts) {
HostDownloader worker = new HostDownloader(host);
CompletableFuture<List<String>> future =
CompletableFuture.supplyAsync(worker);
future.thenAcceptAsync((files) -> {
for (String target : files) {
new FileDownloader(target).run();
}
});
}
}
public static class HostDownloader implements Supplier<List<String>> {
/* not shown */
}
/* My implementation should either be Runnable or Consumer.
Please suggest based on a idiomatic approach to the main loop.
*/
public static class FileDownloader implements Runnable, Consumer<String> {
private String target;
public FileDownloader(String target) {
this.target = target;
}
@Override
public void run() { accept(this.target); }
@Override
public void accept(String target) {
try (Writer output = new FileWriter("/tmp/blubb")) {
output.write(new URL(target).getContent().toString());
} catch (IOException e) { /* just for demo */ }
}
}
}
Now, this doesn't feel natural. I'm producing a stream of String
s and my FileDownloader
consumes one of them at a time. Is there a readymade to enable my single value Consumer
to work with List
s or am I stuck with the for
loop here?
I know it's trivial to move the loop into the accept
and just make a Consumer<List<String>>
, that's not the point.
There is no point in dissolving two directly dependent steps into two asynchronous steps. They are still dependent and if the separation has any effect, it won’t be a positive one.
You can simply use
List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().forEach(fileDownloader));
or, assuming that FileDownloader
does not have mutable state regarding a download:
for(String host: hosts)
CompletableFuture.runAsync(()->
new HostDownloader(host).get().parallelStream().forEach(fileDownloader));
This still has the same level of concurrency as your original approach using supplyAsync
plus thenAcceptAsync
, simply because these two dependent steps can’t run concurrently anyway, so the simple solution is to put both steps into one concise operation that will be executed asynchronously.
However, at this point it’s worth noting that the entire use of CompletableFuture
is not recommended for this operation. As it’s documentation states:
- All async methods without an explicit Executor argument are performed using the
ForkJoinPool.commonPool()
The problem with the common pool is that its pre-configured concurrency level depends on the number of CPU cores and won’t be adjusted if threads are blocked during an I/O operation. In other words, it is unsuitable for I/O operations.
Unlike Stream
, CompletableFuture
allows you to specify an Executor
for the async operations, so you can configure your own Executor
to be suitable for I/O operations, on the other hand, when you deal with an Executor
anyway, there is no need for CompletableFuture
at all, at least not for such a simple task:
List<String> hosts = fetchTargetHosts();
int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
for(String target: new HostDownloader(host).get())
connEs.execute(()->fileDownloader.accept(target));
});
At this place you may consider either, to inline the code of FileDownloader.accept
into the lambda expression or to revert it to be a Runnable
so that you can change the inner loop’s statement to connEs.execute(new FileDownloader(target))
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With