Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the expected behavior when a Java 8 Stream throw a RuntimeException?

When encountering a RuntimeException during stream processing, should the stream processing abort? Should it first finish? Should the exception be rethrown on Stream.close()? Is the exception rethrown as is or is it wrapped? The JavaDoc of Stream and package java.util.stream has nothing to say about it.

All questions on Stackoverflow that I have found seem to be focused on how-to wrap a checked exception from within a functional interface in order to make their code compile. Indeed, blog posts and similar articles on Internet all focus on the same caveat. This is not my concern.

I know from my own experience that the processing of sequential streams will abort as soon as a RuntimeException is thrown and this exception is rethrown as is. This is the same for parallel stream only if the exception was thrown by the client's thread.

However, example code put here demonstrate that if the exception was thrown by a "worker thread" (= not the same thread as the one invoking the terminal operation) during a parallel stream processing, then this exception will forever be lost and the stream processing completes.

The example code will first run an IntStream in parallel. Then a "normal" Stream in parallel.

The example will show that,

1) IntStream has no problem aborting parallel processing if a RuntimeException is encountered. The exception is re-thrown, wrapped in another RuntimeException.

2) Stream does not play as nice. In fact, client thread will never see a trace of the RuntimeException thrown. The stream does not only finish processing; more elements than what limit() specified will be processed!

In the example code, IntStream is generated using IntStream.range(). The "normal" Stream has no notion of a "range" and is instead made up of 1:s, but Stream.limit() is called to limit the stream to one billion elements.

Here's another twist. The example code that produce the IntStream does something like this:

IntStream.range(0, 1_000_000_000).parallel().forEach(..)

Change that to a generated stream just like the second example in the code:

IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)

The outcome for this IntStream is the same: Exception is wrapped and rethrown, the processing aborts. But, the second stream will now also wrap and rethrow the exception and not process more elements than the limit! Thus: Changing how the first stream is produced have a side-effect on how the second stream behave. To me, this is very odd.

JavaDoc of ForkJoinPool.invoke() and ForkJoinTask says that exceptions are rethrown and this is what I would have expected from a parallel stream.

Background

I encountered this "problem" when processing elements in a parallel stream taken from Collection.stream().parallel() (I haven't verified the behavior of Collection.parallelStream() but it should be the same). What happened was that a "worker thread" crashed and then went silently away while all other threads completed the stream successfully. My app uses a default exception handler that write the exception to a log file. But not even this log file was created. The thread and his exception simply disappeared. Since I need to abort as soon as a runtime exception is caught, one alternative is to write code that leak this exception to other workers making them unwilling to proceed if an exception has been thrown by any other thread. Of course, this does not guarantee that the stream implementation simply keep on spawning new threads trying to complete the stream. So I will probably end up not using parallel streams and instead do "normal" concurrent programming using a thread pool/executor.

This show that the problem of lost runtime exceptions is not isolated to streams generated by Stream.generate() or streams using Stream.limit(). And bottom line is that I would love to know what .. is the expected behavior?

like image 482
Martin Andersson Avatar asked Sep 01 '16 00:09

Martin Andersson


People also ask

How do I throw an exception in Java 8?

All methods use the throw statement to throw an exception. The throw statement requires a single argument: a throwable object. Throwable objects are instances of any subclass of the Throwable class.

Can we throw checked exception in Java stream?

If a method which is used within a Java 8 stream processing throws a checked exception, this exception has to be handled. One way to do this is to wrap the checked exception with a java. lang. RuntimeException and throw it.

Does Java 8 stream improve performance?

Java 8 introduced streams. Not to be confused with input/output streams, these Java 8+ streams can also process data that goes through them. It was hailed as a great new feature that allowed coders to write algorithms in a more readable (and therefore more maintainable) way.


1 Answers

There is no difference in the behavior of these two streams regarding exception reporting, the problem is that you put both tests one after another into one method and let them access shared data structures.

There is a subtle, perhaps not sufficiently documented (if intended) behavior: when a stream operation completes exceptionally, it does not wait for the completion of all concurrent operations.

So when you catch the exception of the first stream operation, there are still some threads running and accessing your shared data. So when you reset your AtomicBoolean, one of these threads belonging to the first job will read the false value, turn it to true, print the message and throw an exception which gets lost, as the stream operation already completed exceptionally. Further, some of these threads will raise your counter after you reset it, that’s why it has a higher number than the second job would allow. Your second job does not complete exceptionally, as all threads belonging to the second job will read a true value from the AtomicBoolean.

There are some ways to spot this.

When you remove the first stream operation, the second will complete exceptionally as expected. Also, inserting the statement

ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);

between the two stream operations will fix the problem, as it waits for the completion of all threads.

However, the cleaner solution would be to let both stream operations use their own counter and flag.

That said, there is a subtle, implementation dependent difference that causes the problem to disappear if you just swap the two operations. The IntStream.range operation produces a stream with a known size, which allows splitting it into concurrent tasks which intrinsically know, how many elements to process. This allows abandoning these tasks in the exceptional case as described above. On the other hand, combining an infinite stream as returned by generate with limit does not produce a sized stream (though that would be possible). Since such a stream is treated as having an unknown size, the subtasks have to synchronize on a counter to ensure that the limit is obeyed. This causes the sub-tasks to (sometimes) complete, even in the exceptional case. But as said, that is a side effect of an implementation detail, not an intentional wait for completion. And since it’s about concurrency, the result might be different, if you run it multiple times.

like image 179
Holger Avatar answered Oct 17 '22 04:10

Holger