To allow multiple iterations on the resulting stream from a CompletableFuture<Stream<String>>
I am considering one of the following approaches:
Convert the resulting future to CompletableFuture<List<String>>
through: teams.thenApply(st -> st.collect(toList()))
Convert the resulting future to Flux<String>
with cache: Flux.fromStream(teams::join).cache();
Flux<T>
is the implementation of Publisher<T>
in project reactor.
Use case:
I would like to get a sequence with the premier league teams names (e.g. Stream<String>
) from a data source which provides a League
object with a Standing[]
(based on football-data RESTful API, e.g. http://api.football-data.org/v1/soccerseasons/445/leagueTable). Using AsyncHttpClient
and Gson
we have:
CompletableFuture<Stream<String>> teams = asyncHttpClient .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable") .execute() .toCompletableFuture() .thenApply(Response::getResponseBody) .thenApply(body -> gson.fromJson(body, League.class)); .thenApply(l -> stream(l.standings).map(s -> s.teamName));
To re-use the resulting stream I have two options:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList())) 2. Flux<String> res = Flux.fromStream(teams::join).cache()
Flux<T>
is less verbose and provides all that I need. Yet, is it correct to use it in this scenario?
Or should I use CompletableFuture<List<String>>
instead? Or is there any other better alternative?
UPDATED with some thoughts (2018-03-16):
CompletableFuture<List<String>>
:
List<String>
will be collected in a continuation and when we need to proceed with the result of the future, maybe it is already completed.List<T>
.Flux<String>
:
.cache()
and forward it to the next layer, which can take advantage of the reactive API, e.g. web flux reactive controller, e.g. @GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
Flux<T>
we have to wrap it in a cacheable Flux<T>
(….cache()
) which in turn will add overhead on the first traversal, because it has to store the resulting items in an internal cache. CompletableFuture<Stream<String>> teams = ...; Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
Flux.fromStream(teams::join)
is a code smell because it's blocking a thread to fetch the result from the CompletableFuture
which is running on another thread.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With