Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is `parallelStream` faster than the `CompletableFuture` implementation?

I wanted to increase the performance of my backend REST API on a certain operation that polled multiple different external APIs sequentially and collected their responses and flattened them all into a single list of responses.

Having just recently learned about CompletableFutures, I decided to give it a go, and compare that solution with the one that involved simply changing my stream for a parallelStream.

Here is the code used for the benchmark-test:

package com.foo;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;


public class ConcurrentTest {

    static final List<String> REST_APIS =
            Arrays.asList("api1", "api2", "api3", "api4", "api5", "api6", "api7", "api8");
    MyTestUtil myTest = new MyTestUtil();
    long millisBefore; // used to benchmark

    @BeforeEach
    void setUp() {
        millisBefore = System.currentTimeMillis();
    }

    @AfterEach
    void tearDown() {
        System.out.printf("time taken : %.4fs\n",
                (System.currentTimeMillis() - millisBefore) / 1000d);
    }

    @Test
    void parallelSolution() { // 4s
        var parallel = REST_APIS.parallelStream()
                .map(api -> myTest.collectOneRestCall())
                .flatMap(List::stream)
                .collect(Collectors.toList());

        System.out.println("List of responses: " + parallel.toString());
    }

    @Test
    void futureSolution() throws Exception { // 8s
        var futures = myTest.collectAllResponsesAsync(REST_APIS);

        System.out.println("List of responses: " + futures.get()); // only blocks here
    }

    @Test
    void originalProblem() { // 32s
        var sequential = REST_APIS.stream()
                .map(api -> myTest.collectOneRestCall())
                .flatMap(List::stream)
                .collect(Collectors.toList());

        System.out.println("List of responses: " + sequential.toString());
    }
}


class MyTestUtil {

    public static final List<String> RESULTS = Arrays.asList("1", "2", "3", "4");

    List<String> collectOneRestCall() {
        try {
            TimeUnit.SECONDS.sleep(4); // simulating the await of the response
        } catch (Exception io) {
            throw new RuntimeException(io);
        } finally {
            return MyTestUtil.RESULTS; // always return something, for this demonstration
        }
    }

    CompletableFuture<List<String>> collectAllResponsesAsync(List<String> restApiUrlList) {

        /* Collecting the list of all the async requests that build a List<String>. */
        List<CompletableFuture<List<String>>> completableFutures = restApiUrlList.stream()
                .map(api -> nonBlockingRestCall())
                .collect(Collectors.toList());

        /* Creating a single Future that contains all the Futures we just created ("flatmap"). */
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures
                .toArray(new CompletableFuture[restApiUrlList.size()]));

        /* When all the Futures have completed, we join them to create merged List<String>. */
        CompletableFuture<List<String>> allCompletableFutures = allFutures
                .thenApply(future -> completableFutures.stream()
                        .filter(Objects::nonNull) // we filter out the failed calls
                        .map(CompletableFuture::join)
                        .flatMap(List::stream) // creating a List<String> from List<List<String>>
                        .collect(Collectors.toList())
                );

        return allCompletableFutures;
    }

    private CompletableFuture<List<String>> nonBlockingRestCall() {
        /* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
        return CompletableFuture.supplyAsync(() -> collectOneRestCall())
                .exceptionally(ex -> {
                    return null; // gets managed in the wrapping Future
                });
    }

}

There is a list of 8 (fake) APIs. Each response takes 4 seconds to execute and returns a list of 4 entities (Strings, in our case, for the sake of simplicity).

The results:

  1. stream : 32 seconds
  2. parallelStream : 4 seconds
  3. CompletableFuture : 8 seconds

I'm quite surprised and expected the last two to be almost identical. What exactly is causing that difference? As far as I know, they are both using the ForkJoinPool.commonPool().

My naive interpretation would be that parallelStream, since it is a blocking operation, uses the actual MainThread for its workload and thus has an extra active thread to work with, compared to the CompletableFuture which is asynchronous and thus cannot use that MainThread.

like image 632
payne Avatar asked Dec 03 '19 22:12

payne


People also ask

What is the difference between stream () and ParallelStream ()?

stream() works in sequence on a single thread with the println() operation. list. parallelStream(), on the other hand, is processed in parallel, taking full advantage of the underlying multicore environment. The interesting aspect is in the output of the preceding program.

How many threads does ParallelStream use?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

What is ParallelStream?

Java Parallel Streams is a feature of Java 8 and higher, meant for utilizing multiple cores of the processor. Normally any java code has one stream of processing, where it is executed sequentially.

Is ParallelStream blocking?

An operation on a ParallelStream is still blocking and will wait for all the threads it spawned to finish. These threads are executed asynchronously (they don't wait for a previous one to finish), but that doesn't mean your whole code starts behaving asynchronously !


1 Answers

CompletableFuture.supplyAsync() will end up using a ForkJoinPool initialized with parralelism of Runtime.getRuntime().availableProcessors() - 1 (JDK 11 source)

So looks like you have an 8 processor machine. Therefore there are 7 threads in the pool.

There are 8 API calls, so only 7 can run at a time on the common pool. And for the completable futures test, there will be 8 tasks running with your main thread blocking until they all complete. 7 will be able to execute at once meaning one has to wait for 4 seconds.

parallelStream() also uses this same thread pool, however the difference is that the first task will be executed on main thread that is executing the stream's terminal operation, leaving 7 to be distributed to the common pool. So there are just enough threads to run everything in parallel in this scenario. Try increasing the number of tasks to 9 and you will get the 8 second run-time for your test.

like image 161
prunge Avatar answered Oct 02 '22 15:10

prunge