Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to transform CompletableFuture<Stream<T>> to Stream<CompletableFuture<T>>?

That's it. Really could not find something even close. I want to implement the following chain of tasks

List<Item> items = Collections.singletonList(new Item("John Smith", "1997-2014"));

Stream<CompletableFuture<List<ScrappingResult>>> scrappingFutures =
    items.stream().map(item ->
        CompletableFuture.supplyAsync(() -> new ScrappingTask(item).call()));

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
    scrappingFutures.map(resultsFuture -> ???);

Stream<CompletableFuture<TrimmingResult>> trimmingResults = scrappingFuturesUnwrapped.map(resultFuture ->
    // thenCompose?
    resultFuture.thenCompose(result -> {
        Path clipsDir = Paths.get("./"
            + result.getItem().getName()
            + "/" + result.getItem().getTimespan());

        AtomicInteger clipIdx = new AtomicInteger();

        return result.getVideo().getClips().stream()
            .map(clip -> CompletableFuture.supplyAsync(() ->
                new TrimmingTask(
                    ffmpegPath,
                    result.getVideo().getVideoUrl(),
                    clip,
                    clipsDir.resolve("clip_" + clipIdx.incrementAndGet() + ".mp3")).call())
            );
    });
);

The last line is not syntactically correct, but I hope conveys the idea. So, I want to do something like flatMap twice and get Stream<CompletableFuture<TrimmingResult>> at the end.

How do I do that?

Thanks.

like image 685
Valya Avatar asked Oct 26 '25 06:10

Valya


1 Answers

If I don't misunderstand your intention, you want to flatten the result only. you can use the Spliterator to receive result lazily & use flatMap to merge Streams into a flatten stream, for example:

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> unwrap(each));

static <T> Stream<? extends CompletableFuture<T>> 
  unwrap(CompletableFuture<? extends List<? extends T>> master) {

    return generate(new Predicate<Consumer<? super CompletableFuture<T>>>() {

        private Iterator<? extends T> cursor;

        @Override
        public boolean test(Consumer<? super CompletableFuture<T>> consumer) {
            cursor = cursor == null ? await().iterator() : cursor;
            if (cursor.hasNext()) {
                consumer.accept(completedFuture(cursor.next()));
                return true;
            }
            return false;
        }

        //                        v--- blocked at the first time
        private List<? extends T> await() {
            try {
                return master.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new CompletionException(e);
            }
        }
    });
}

static <T> Stream<? extends CompletableFuture<T>> 
  generate(Predicate<Consumer<? super CompletableFuture<T>>> generator) {

    long unknownSize = Long.MAX_VALUE;
    return stream(new AbstractSpliterator<CompletableFuture<T>>(unknownSize, 0) {
        public boolean tryAdvance(Consumer<? super CompletableFuture<T>> action) {
            return generator.test(action);
        }
    }, false);
}

Summary

The solution above is just my first thought and it isn't the best approach in this case, you can think it against with big design first. However, even if it is a poor solution but I'll keep it here since it maybe give somebody thinking in other ways. for more details, you can see the comments of @Holger from here and there.

I admit the best approach is what @Holger have said below, since there is no one write it down, please let me record it to serve more people.

Stream<CompletableFuture<ScrappingResult>> scrappingFuturesUnwrapped =
               scrappingFutures.flatMap(each -> each.join().stream())
                               .map(CompletableFuture::completedFuture);
like image 98
holi-java Avatar answered Oct 28 '25 20:10

holi-java



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!