Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I convert a CompletableFuture<Stream<T>> to a Stream<T> without blocking

I am using the Async Http Client library (with Netty) to make asynchronous Http Get requests to a RESTful API. Since I want to preserve the non-blocking behavior I am returning instances of CompletableFuture<T> as the result of the Http Get requests. So, where a RESTful API endpoints returns a Json array, I am returning a CompletableFuture<T[]>.

Yet, and according to the classification made by Erik Meijer about the The Four Essential Effects In Programming, I consider that Stream<T> is better suited to the result of a Java method that makes an asynchronous Http Get request and returns a Json array. In this case we can see the Stream<T> as the Observable<T> equivalent, which is the result of an asynchronous computation that returns many values.

So, considering that resp holds the response, then I can get a CompletableFuture<Stream<T>> as following:

 CompletableFuture<T[]> resp = …
 return resp.thenApply(Arrays::stream);

However, I was wondering how can I convert the CompletableFuture<Stream<T>> resp to a Stream<T>, without waiting for the computation to complete (i.e I do NOT want to block on get() invocation)?

I would like to have the same result as the following expression, but WITHOUT blocking on get():

return resp.thenApply(Arrays::stream).get();
like image 528
rodolfino Avatar asked May 23 '16 14:05

rodolfino


1 Answers

You can build a Stream<T> that will defer the call to the Future<T> get() method, just like this:

CompletableFuture<T[]> resp = ...
return Stream
        .of(resp)                               // Stream<CompletableFuture<T[]>>
        .flatMap(f -> Arrays.stream(f.join())); // Stream<T>

To simplify usage, instead of get() I am using join() to avoid checked exceptions.

like image 123
Miguel Gamboa Avatar answered Sep 22 '22 15:09

Miguel Gamboa