Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to sequentially pass list values to single value consumer?

I'm toying with Java8's streams and CompletableFutures. My pre-existing code has a class that takes a single URL and downloads it:

public class FileDownloader implements Runnable {
    private URL target;
    public FileDownloader(String target) {
        this.target = new URL(target);
    }
    public void run() { /* do it */ }
}

Now, this class gets it's information from another part that emits List<String> (a number of targets on a single host).

I've switched the surrounding code to CompletableFuture:

public class Downloader {
    public static void main(String[] args) {
        List<String> hosts = fetchTargetHosts();
        for (String host : hosts) {
            HostDownloader worker = new HostDownloader(host);
            CompletableFuture<List<String>> future = 
                CompletableFuture.supplyAsync(worker);
            future.thenAcceptAsync((files) -> {
                for (String target : files) {
                    new FileDownloader(target).run();
                }
            });
        }
    }

    public static class HostDownloader implements Supplier<List<String>> {
        /* not shown */ 
    }
    /* My implementation should either be Runnable or Consumer.
       Please suggest based on a idiomatic approach to the main loop.
     */
    public static class FileDownloader implements Runnable, Consumer<String> { 
        private String target;
        public FileDownloader(String target) {
            this.target = target;
        }

        @Override
        public void run() { accept(this.target); }

        @Override
        public void accept(String target) {
            try (Writer output = new FileWriter("/tmp/blubb")) {
                output.write(new URL(target).getContent().toString());
            } catch (IOException e) { /* just for demo */ }
        }
    }
}

Now, this doesn't feel natural. I'm producing a stream of Strings and my FileDownloader consumes one of them at a time. Is there a readymade to enable my single value Consumer to work with Lists or am I stuck with the for loop here?

I know it's trivial to move the loop into the accept and just make a Consumer<List<String>>, that's not the point.

like image 982
mabi Avatar asked May 20 '15 09:05

mabi


1 Answers

There is no point in dissolving two directly dependent steps into two asynchronous steps. They are still dependent and if the separation has any effect, it won’t be a positive one.

You can simply use

List<String> hosts = fetchTargetHosts();
FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().forEach(fileDownloader));

or, assuming that FileDownloader does not have mutable state regarding a download:

for(String host: hosts)
    CompletableFuture.runAsync(()->
        new HostDownloader(host).get().parallelStream().forEach(fileDownloader));

This still has the same level of concurrency as your original approach using supplyAsync plus thenAcceptAsync, simply because these two dependent steps can’t run concurrently anyway, so the simple solution is to put both steps into one concise operation that will be executed asynchronously.


However, at this point it’s worth noting that the entire use of CompletableFuture is not recommended for this operation. As it’s documentation states:

  • All async methods without an explicit Executor argument are performed using the ForkJoinPool.commonPool()

The problem with the common pool is that its pre-configured concurrency level depends on the number of CPU cores and won’t be adjusted if threads are blocked during an I/O operation. In other words, it is unsuitable for I/O operations.

Unlike Stream, CompletableFuture allows you to specify an Executor for the async operations, so you can configure your own Executor to be suitable for I/O operations, on the other hand, when you deal with an Executor anyway, there is no need for CompletableFuture at all, at least not for such a simple task:

List<String> hosts = fetchTargetHosts();

int concurrentHosts = 10;
int concurrentConnections = 100;
ExecutorService hostEs=Executors.newWorkStealingPool(concurrentHosts);
ExecutorService connEs=Executors.newWorkStealingPool(concurrentConnections);

FileDownloader fileDownloader = new FileDownloader();
for(String host: hosts) hostEs.execute(()-> {
    for(String target: new HostDownloader(host).get())
        connEs.execute(()->fileDownloader.accept(target));
});

At this place you may consider either, to inline the code of FileDownloader.accept into the lambda expression or to revert it to be a Runnable so that you can change the inner loop’s statement to connEs.execute(new FileDownloader(target)).

like image 87
Holger Avatar answered Sep 23 '22 21:09

Holger