Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Run IO computations in parallel in Java8

Tags:

java

java-8

I'm familiar with functional programming languages, usually in Scala and Javascript. I'm working on a Java8 project and not sure how I am supposed to run through a list/stream of item, and perform some side-effect for each of them in parallel, using a custom thread pool, and return an object on which it's possible to listen for completion (wether it's a success or failure).

Currently I have the following code, it seems to work (I'm using Play framework Promise implementation as return) but it seems not ideal because ForkJoinPool is not meant to be used for IO intensive computations in the first place.

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
    ForkJoinPool pool = new ForkJoinPool(3);
    ForkJoinTask<F.Promise<Void>> result = pool
            .submit(() -> {
                try {
                    items.parallel().forEach(performSingleItemBackup);
                    return F.Promise.<Void>pure(null);
                } catch (Exception e) {
                    return F.Promise.<Void>throwing(e);
                }
            });

    try {
        return result.get();
    } catch (Exception e) {
        throw new RuntimeException("Unable to get result", e);
    }
}

Can someone give me a more idiomatic implementation of the above function? Ideally not using the ForkJoinPool, using a more standard return type, and most recent Java8 APIs? Not sure what I'm supposed to use between CompletableFuture, CompletionStage, ForkJoinTask...

like image 290
Sebastien Lorber Avatar asked Jan 03 '18 11:01

Sebastien Lorber


People also ask

What is parallel processing in java8?

Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems.

How do I create a parallel stream in Java 8?

Creating Parallel StreamsStream stream = Stream. of("John", "Mike", "Ryan","Donald", "Matthew"); Stream parallelStream = stream. parallel(); Another way to create parallelstream is using parallelStream() method.

Is Java 8 support parallel and sequential streams?

One of the prominent features of Java 8 (or higher) is Java Parallel Stream. It is meant for utilizing the various cores of the processor. Usually, any Java code that has only one processing stream, where it is sequentially executed.


1 Answers

A canonical solution would be

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
    ForkJoinPool pool = new ForkJoinPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new));
    } finally {
        pool.shutdown();
    }
}

Note that the interaction between ForkJoin pool and parallel streams is an unspecified implementation detail you should not rely on. In contrast, CompletableFuture provides a dedicated API for providing an Executor. It doesn’t even have to be a ForkJoinPool:

public static CompletableFuture<Void> performAllItemsBackup(Stream<Item> items) {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new));
    } finally {
        pool.shutdown();
    }
}

In either case, you should shut down the executor explicitly instead of relying on automatic cleanup.

If you need a F.Promise<Void> result, you can use

public static F.Promise<Void> performAllItemsBackup(Stream<Item> items) {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    try {
        return CompletableFuture.allOf(
            items.map(CompletableFuture::completedFuture)
                 .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                 .toArray(CompletableFuture<?>[]::new))
            .handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
            .join();
    } finally {
        pool.shutdown();
    }
}

But note that this, like your original code, only returns when the operation has been completed, while the methods returning a CompletableFuture allow the operations to run asynchronously until the caller invokes join or get.

To return a truly asynchronous Promise, you have to wrap the entire operation, e.g.

public static F.Promise<Void> performAllItemsBackup(Stream<Item> stream) {
    return F.Promise.pure(stream).flatMap(items -> {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        try {
            return CompletableFuture.allOf(
                items.map(CompletableFuture::completedFuture)
                     .map(f -> f.thenAcceptAsync(performSingleItemBackup, pool))
                     .toArray(CompletableFuture<?>[]::new))
                .handle((v, e) -> e!=null? F.Promise.<Void>throwing(e): F.Promise.pure(v))
                .join();
        } finally {
            pool.shutdown();
        }
    });
}

But it’s better to decide for one API instead of jumping back and forth between two different APIs.

like image 90
Holger Avatar answered Oct 19 '22 23:10

Holger