Method 1
The usual, very fast, and works great.
public static int loops = 500;
private static ExecutorService customPool = Executors.newFixedThreadPool(loops);
.
.
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
.collect(Collectors.toList()).stream() // collect first, else will be sequential
.map(CompletableFuture::join)
.mapToLong(Long::longValue)
.summaryStatistics();
log.info("cf completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cf completed in :: 1054, summaryStats :: LongSummaryStatistics{count=500, sum=504008, min=1000, average=1008.016000, max=1017}
I understand that if I don't collect the stream first, then by nature of laziness, the stream will spring up CompletableFutures one by one, and behave synchronously. So, as an experiment:
Method 2
Remove the intermediate collect step, but make the stream parallel also! :
Instant start = Instant.now();
LongSummaryStatistics stats = LongStream.range(0, loops).boxed()
.parallel()
.map(number -> CompletableFuture.supplyAsync(() -> DummyProcess.slowNetworkCall(number), customPool))
.map(CompletableFuture::join) // direct join
.mapToLong(Long::longValue).summaryStatistics();
log.info("cfps_directJoin completed in :: {}, summaryStats :: {} ", Duration.between(start, Instant.now()).toMillis(), stats);
// ... cfps_directJoin completed in :: 8098, summaryStats :: LongSummaryStatistics{count=500, sum=505002, min=1000, average=1010.004000, max=1015}
Summary:
A pattern I observed:
Why is there this batching of calls in the parallel + direct collection approach?
For completion, here's my dummy network call method:
public static Long slowNetworkCall(Long i) {
Instant start = Instant.now();
log.info(" {} going to sleep..", i);
try {
TimeUnit.MILLISECONDS.sleep(1000); // 1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(" {} woke up..", i);
return Duration.between(start, Instant.now()).toMillis();
}
stream() works in sequence on a single thread with the println() operation. list. parallelStream(), on the other hand, is processed in parallel, taking full advantage of the underlying multicore environment. The interesting aspect is in the output of the preceding program.
The CompletableFuture. get() method is blocking. It waits until the Future is completed and returns the result after its completion.
In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.
Usually, any Java code that has only one processing stream, where it is sequentially executed. However, by using parallel streams, one can separate the Java code into more than one stream, which is executed in parallel on their separate cores, and the end result is the combination of the individual results.
This is an artifact of how ForJoinPool
handles things when you block its inner threads, and how many new ones it spawns. Though, I could probably find the exact lines where this happens, I am not sure it is worth it. For two reasons:
that logic can change
the code inside ForkJoinPool
is by far not trivial
It seems that for both of us, ForkJoinPool.commonPool().getParallelism()
will return 11
, so I get the same results as you do. If you log ForkJoinPool.commonPool().getPoolSize()
to find out how many active threads is your code using, you will see that after a certain period, it just stabilizes itself at 64
. So the max tasks that can be processed at the same time is 64
, which is on par with the result that you see (those 8 seconds
).
If I run your code with -Djava.util.concurrent.ForkJoinPool.common.parallelism=50
, it is now executed in 2 seconds
, and the pool size is increased to 256
. That means, there is an internal logic that adjusts these kind of things.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With