Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CompletableFuture on ParallelStream gets batched and runs slower than sequential stream?

Method 1

The usual, very fast, and works great.

public static int loops = 500;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
        .collect(Collectors.toList()).stream() // collect first, else will be sequential
        .map(CompletableFuture::join)
        .mapToLong(Long::longValue)
        .summaryStatistics();

log.info("cf completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cf completed in :: 1054, summaryStats :: LongSummaryStatistics{count=500, sum=504008, min=1000, average=1008.016000, max=1017} 

I understand that if I don't collect the stream first, then by nature of laziness, the stream will spring up CompletableFutures one by one, and behave synchronously. So, as an experiment:

Method 2

Remove the intermediate collect step, but make the stream parallel also! :

Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
        .parallel()
        .map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
        .map(CompletableFuture::join) // direct join
        .mapToLong(Long::longValue).summaryStatistics();

log.info("cfps_directJoin completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cfps_directJoin completed in :: 8098, summaryStats :: LongSummaryStatistics{count=500, sum=505002, min=1000, average=1010.004000, max=1015}

Summary:

  • Method 1 :: 1 second
  • Method 2 :: 8 seconds

A pattern I observed:

  1. the parallelstream approach "batches" 60 calls at onces, so with 500 loops, 500/60 ~ 8 batches, each taking 1 second, thus total 8
  2. SO, when I reduce the loop count to 300, there are 300/60 = 5 batches, and it takes 5 seconds to complete actually.

So, the question is:

Why is there this batching of calls in the parallel + direct collection approach?


For completion, here's my dummy network call method:

    public static Long slowNetworkCall(Long i) {
        Instant start = Instant.now();
        log.info(" {} going to sleep..", i);
        try {
            TimeUnit.MILLISECONDS.sleep(1000); // 1 second
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info(" {} woke up..", i);
        return Duration.between(start, Instant.now()).toMillis();
    }
like image 272
Somjit Avatar asked May 06 '21 03:05

Somjit


People also ask

What is the difference between stream () and parallelStream ()?

stream() works in sequence on a single thread with the println() operation. list. parallelStream(), on the other hand, is processed in parallel, taking full advantage of the underlying multicore environment. The interesting aspect is in the output of the preceding program.

Is Completable future blocking?

The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.

How many threads does parallelStream use?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

What is parallel processing in stream?

Usually, any Java code that has only one processing stream, where it is sequentially executed. However, by using parallel streams, one can separate the Java code into more than one stream, which is executed in parallel on their separate cores, and the end result is the combination of the individual results.


1 Answers

This is an artifact of how ForJoinPool handles things when you block its inner threads, and how many new ones it spawns. Though, I could probably find the exact lines where this happens, I am not sure it is worth it. For two reasons:

  • that logic can change

  • the code inside ForkJoinPool is by far not trivial

It seems that for both of us, ForkJoinPool.commonPool().getParallelism() will return 11, so I get the same results as you do. If you log ForkJoinPool.commonPool().getPoolSize() to find out how many active threads is your code using, you will see that after a certain period, it just stabilizes itself at 64. So the max tasks that can be processed at the same time is 64, which is on par with the result that you see (those 8 seconds).

If I run your code with -Djava.util.concurrent.ForkJoinPool.common.parallelism=50, it is now executed in 2 seconds, and the pool size is increased to 256. That means, there is an internal logic that adjusts these kind of things.

like image 194
Eugene Avatar answered Oct 21 '22 22:10

Eugene