Say I have an API that, based on some query criteria, will find or construct a widget:
Widget getMatchingWidget(WidgetCriteria c) throws Throwable
The (synchronous) client code looks like:
try {
Widget w = getMatchingWidget(criteria);
processWidget(w);
} catch (Throwable t) {
handleError(t);
}
Now say finding or constructing a widget is unpredictably expensive, and I don't want clients to block while waiting for it. So I change it to:
CompletableFuture<Widget> getMatchingWidget(WidgetCriteria c)
Clients can then write either:
CompletableFuture<Widget> f = getMatchingWidget(criteria);
f.thenAccept(this::processWidget)
f.exceptionally(t -> { handleError(t); return null; })
or:
getMatchingWidget(criteria).whenComplete((t, w) -> {
if (t != null) { handleError(t); }
else { processWidget(t); }
});
Now, let's say instead the synchronous API can return 0 to n widgets:
Stream<Widget> getMatchingWidgets(WidgetCriteria c)
Naively, I could write:
CompletableFuture<Stream<Widget>> getMatchingWidgets(WidgetCriteria c)
However, this doesn't actually make the code non-blocking, it just pushes the blocking around -- either the Future
blocks until all the Widgets
are available, or the code that iterates over the Stream
blocks waiting for each Widget
. What I want is something that will let me process each widget as they arrive, something like:
void forEachMatchingWidget(WidgetCriteria c, Consumer<Widget> widgetProcessor)
But this doesn't offer error handling, and even if I add an additional Consumer<Throwable> errorHandler
, it doesn't let me, for instance, compose my widget retrieval with other queries, or transform the results.
So I'm looking for some composable thing that combines the characteristics of a Stream
(iterability, transformability) with the characteristics of a CompletableFuture
(asynchronous result and error handling). (And, while we're at it, back pressure might be nice.)
Is this a java.util.concurrent.Flow.Publisher? An io.reactivex.Observable? Something more complicated? Something simpler?
The features of Java stream are – A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels. Streams don’t change the original data structure, they only provide the result as per the pipelined methods.
In Java 9, Stream API has improved and new methods are added to the Stream interface. These methods are tabled below. It returns, if this stream is ordered, a stream consisting of the longest prefix of elements taken from this stream that match the given predicate.
Java 9 Stream ofNullable Method. Stream ofNullable method returns a sequential stream that contains a single element, if non-null. Otherwise, it returns an empty stream. It helps to handle null stream and NullPointerException.
Java Stream dropWhile() Method. Stream dropWhile method returns result on the basis of order of stream elements. Ordered stream: It returns a stream that contains elements after dropping the elements that match the given predicate.
Your use case falls very naturally into the world that RxJava is addressing. If we have an observable:
Observable<Widget> getMatchingWidgets(wc);
that produces zero or more widgets based on the criteria, then you can process each widget as it appears using:
getMatchingWidgets(wc)
.subscribeOn( backgroundScheduler )
.subscribe( w -> processWidget(w),
error -> handleError(error) );
The observable chain will run on the backgroundScheduler
, which is often a wrapper for a thread-pool executor service. If you need to do the final processing of each widget in your UI, you can use the observeOn()
operator to switch over to the UI scheduler before processing:
getMatchingWidgets(wc)
.subscribeOn( backgroundScheduler )
.observeOn( uiScheduler )
.subscribe( w -> processWidget(w),
error -> handleError(error) );
To me, the elegance of the RxJava approach is that it handles so many of the nuts and bolts of pipeline management in a fluent manner. Looking at that observer chain, you know exactly what is happening and where.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With