Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to iterate a stream even if exceptions are thrown?

stream.map(obj -> doMap(obj)).collect(Collectors.toList());

private String doMap(Object obj) {
    if (objectIsInvalid) {
    throw new ParseException("Object could not be parsed");
    }
}

Problem: how can I throw the exception and make the stream iteration know that it should not break the whole iteration, but continue with the next element (and eventually log failed objects)?

like image 395
membersound Avatar asked May 22 '15 12:05

membersound


People also ask

Is stream iterator lazy?

iterator() . The latter will process all items of the stream in order to store them into a collection. In contrast, Stream. iterator() will just return a wrapper around the Stream 's Spliterator which will process all items lazily like all other stream operations do.

Is streams better than for loop?

Remember that loops use an imperative style and Streams a declarative style, so Streams are likely to be much easier to maintain. If you have a small list, loops perform better. If you have a huge list, a parallel stream will perform better.

How do you handle exceptions in lambda?

Because IOException is a checked exception, we must handle it explicitly. We have two options. First, we may simply throw the exception outside of our method and take care of it somewhere else. Alternatively, we can handle it inside the method that uses a lambda expression.


1 Answers

Here's one weird trick you can use to improve your exception handling.

Let's say your mapper function is like this:

String doMap(Object obj) {
    if (isInvalid(obj)) {
        throw new IllegalArgumentException("info about obj");
    } else {
        return obj.toString();
    }
}

This returns a result if the object is valid, but it throws an exception if the object is invalid. Unfortunately if you stick this directly into a pipeline, any error will stop the pipeline execution. What you want is something like an "either" type that can hold either a value or an error indicator (which would be a exception in Java).

Turns out that CompletableFuture can hold either a value or an exception. Although it's intended for asynchronous processing -- which isn't occurring here -- we only have to contort it a little bit to use for our purposes.

First, given a stream of objects to process, we call the mapping function wrapped in a call to supplyAsync:

 CompletableFuture<String>[] cfArray = 
        stream.map(obj -> CompletableFuture.supplyAsync(() -> doMap(obj), Runnable::run))
              .toArray(n -> (CompletableFuture<String>[])new CompletableFuture<?>[n]);

(Unfortunately, the generic array creation gives an unchecked warning, which will have to be suppressed.)

The odd construct

 CompletableFuture.supplyAsync(supplier, Runnable::run)

runs the supplier "asynchronously" on the provided Executor Runnable::run, which simply runs the task immediately in this thread. In other words, it runs the supplier synchronously.

The trick is that the CompletableFuture instance returned from this call contains either the value from the supplier, if it returned normally, or it contains an exception, if the supplier threw one. (I'm disregarding cancellation here.) We then gather the CompletableFuture instances into an array. Why an array? It's setup for the next part:

CompletableFuture.allOf(cfArray).join();

This normally waits for the array of CFs to complete. Since they've been run synchronously, they should already all be complete. What's important for this case is that join() will throw a CompletionException if any of the CFs in the array has completed exceptionally. If the join completes normally, we can simply gather up the return values. If the join throws an exception, we can either propagate it, or we can catch it and process the exceptions stored in the CFs in the array. For example,

try {
    CompletableFuture.allOf(cfArray).join();
    // no errors
    return Arrays.stream(cfArray)
                 .map(CompletableFuture::join)
                 .collect(toList());
} catch (CompletionException ce) {
    long errcount =
        Arrays.stream(cfArray)
              .filter(CompletableFuture::isCompletedExceptionally)
              .count();
    System.out.println("errcount = " + errcount);
    return Collections.emptyList();
}

If all are successful, this returns a list of the values. If there are any exceptions, this counts the number of exceptions and returns an empty list. Of course, you could easily do something else, like log the details of the exceptions, filter out the exceptions and return a list of valid values, etc.

like image 130
Stuart Marks Avatar answered Oct 10 '22 13:10

Stuart Marks