Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous, composable return value for vector/stream data in Java 9

Say I have an API that, based on some query criteria, will find or construct a widget:

Widget getMatchingWidget(WidgetCriteria c) throws Throwable

The (synchronous) client code looks like:

try {
  Widget w = getMatchingWidget(criteria);
  processWidget(w);
} catch (Throwable t) {
  handleError(t);
}

Now say finding or constructing a widget is unpredictably expensive, and I don't want clients to block while waiting for it. So I change it to:

CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)

Clients can then write either:

CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })

or:

getMatchingWidget(criteria).whenComplete((t, w) -> {
  if (t != null) { handleError(t); }
  else { processWidget(t); }
});

Now, let's say instead the synchronous API can return 0 to n widgets:

Stream<Widget> getMatchingWidgets(WidgetCriteria c)

Naively, I could write:

CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)

However, this doesn't actually make the code non-blocking, it just pushes the blocking around -- either the Future blocks until all the Widgets are available, or the code that iterates over the Stream blocks waiting for each Widget. What I want is something that will let me process each widget as they arrive, something like:

void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)

But this doesn't offer error handling, and even if I add an additional Consumer<Throwable> errorHandler, it doesn't let me, for instance, compose my widget retrieval with other queries, or transform the results.

So I'm looking for some composable thing that combines the characteristics of a Stream (iterability, transformability) with the characteristics of a CompletableFuture (asynchronous result and error handling). (And, while we're at it, back pressure might be nice.)

Is this a java.util.concurrent.Flow.Publisher? An io.reactivex.Observable? Something more complicated? Something simpler?

like image 222
David Moles Avatar asked Oct 19 '17 19:10

David Moles


People also ask

What are the features of Java stream?

The features of Java stream are – A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels. Streams don’t change the original data structure, they only provide the result as per the pipelined methods.

What's new in Java 9 stream API?

In Java 9, Stream API has improved and new methods are added to the Stream interface. These methods are tabled below. It returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream that match the given predicate.

What is stream of nullable in Java 9?

Java 9 Stream ofNullable Method. Stream ofNullable method returns a sequential stream that contains a single element, if non-null. Otherwise, it returns an empty stream. It helps to handle null stream and NullPointerException.

What is stream dropWhile in Java?

Java Stream dropWhile() Method. Stream dropWhile method returns result on the basis of order of stream elements. Ordered stream: It returns a stream that contains elements after dropping the elements that match the given predicate.


1 Answers

Your use case falls very naturally into the world that RxJava is addressing. If we have an observable:

Observable<Widget> getMatchingWidgets(wc);

that produces zero or more widgets based on the criteria, then you can process each widget as it appears using:

getMatchingWidgets(wc)
  .subscribeOn( backgroundScheduler )
  .subscribe( w -> processWidget(w),
              error -> handleError(error) );

The observable chain will run on the backgroundScheduler, which is often a wrapper for a thread-pool executor service. If you need to do the final processing of each widget in your UI, you can use the observeOn() operator to switch over to the UI scheduler before processing:

getMatchingWidgets(wc)
  .subscribeOn( backgroundScheduler )
  .observeOn( uiScheduler )
  .subscribe( w -> processWidget(w),
              error -> handleError(error) );

To me, the elegance of the RxJava approach is that it handles so many of the nuts and bolts of pipeline management in a fluent manner. Looking at that observer chain, you know exactly what is happening and where.

like image 112
Bob Dalgleish Avatar answered Sep 23 '22 09:09

Bob Dalgleish