When encountering a RuntimeException
during stream processing, should the stream processing abort? Should it first finish? Should the exception be rethrown on Stream.close()
? Is the exception rethrown as is or is it wrapped? The JavaDoc of Stream
and package java.util.stream has nothing to say about it.
All questions on Stackoverflow that I have found seem to be focused on how-to wrap a checked exception from within a functional interface in order to make their code compile. Indeed, blog posts and similar articles on Internet all focus on the same caveat. This is not my concern.
I know from my own experience that the processing of sequential streams will abort as soon as a RuntimeException
is thrown and this exception is rethrown as is. This is the same for parallel stream only if the exception was thrown by the client's thread.
However, example code put here demonstrate that if the exception was thrown by a "worker thread" (= not the same thread as the one invoking the terminal operation) during a parallel stream processing, then this exception will forever be lost and the stream processing completes.
The example code will first run an IntStream
in parallel. Then a "normal" Stream
in parallel.
The example will show that,
1) IntStream
has no problem aborting parallel processing if a RuntimeException is encountered. The exception is re-thrown, wrapped in another RuntimeException.
2) Stream
does not play as nice. In fact, client thread will never see a trace of the RuntimeException thrown. The stream does not only finish processing; more elements than what limit()
specified will be processed!
In the example code, IntStream
is generated using IntStream.range(). The "normal" Stream
has no notion of a "range" and is instead made up of 1:s, but Stream.limit() is called to limit the stream to one billion elements.
Here's another twist. The example code that produce the IntStream does something like this:
IntStream.range(0, 1_000_000_000).parallel().forEach(..)
Change that to a generated stream just like the second example in the code:
IntStream.generate(() -> 1).limit(1_000_000_000).parallel().forEach(..)
The outcome for this IntStream is the same: Exception is wrapped and rethrown, the processing aborts. But, the second stream will now also wrap and rethrow the exception and not process more elements than the limit! Thus: Changing how the first stream is produced have a side-effect on how the second stream behave. To me, this is very odd.
JavaDoc of ForkJoinPool.invoke() and ForkJoinTask
says that exceptions are rethrown and this is what I would have expected from a parallel stream.
I encountered this "problem" when processing elements in a parallel stream taken from Collection.stream().parallel()
(I haven't verified the behavior of Collection.parallelStream()
but it should be the same). What happened was that a "worker thread" crashed and then went silently away while all other threads completed the stream successfully. My app uses a default exception handler that write the exception to a log file. But not even this log file was created. The thread and his exception simply disappeared. Since I need to abort as soon as a runtime exception is caught, one alternative is to write code that leak this exception to other workers making them unwilling to proceed if an exception has been thrown by any other thread. Of course, this does not guarantee that the stream implementation simply keep on spawning new threads trying to complete the stream. So I will probably end up not using parallel streams and instead do "normal" concurrent programming using a thread pool/executor.
This show that the problem of lost runtime exceptions is not isolated to streams generated by Stream.generate()
or streams using Stream.limit()
. And bottom line is that I would love to know what .. is the expected behavior?
All methods use the throw statement to throw an exception. The throw statement requires a single argument: a throwable object. Throwable objects are instances of any subclass of the Throwable class.
If a method which is used within a Java 8 stream processing throws a checked exception, this exception has to be handled. One way to do this is to wrap the checked exception with a java. lang. RuntimeException and throw it.
Java 8 introduced streams. Not to be confused with input/output streams, these Java 8+ streams can also process data that goes through them. It was hailed as a great new feature that allowed coders to write algorithms in a more readable (and therefore more maintainable) way.
There is no difference in the behavior of these two streams regarding exception reporting, the problem is that you put both tests one after another into one method and let them access shared data structures.
There is a subtle, perhaps not sufficiently documented (if intended) behavior: when a stream operation completes exceptionally, it does not wait for the completion of all concurrent operations.
So when you catch the exception of the first stream operation, there are still some threads running and accessing your shared data. So when you reset your AtomicBoolean
, one of these threads belonging to the first job will read the false
value, turn it to true
, print the message and throw an exception which gets lost, as the stream operation already completed exceptionally. Further, some of these threads will raise your counter after you reset it, that’s why it has a higher number than the second job would allow. Your second job does not complete exceptionally, as all threads belonging to the second job will read a true
value from the AtomicBoolean
.
There are some ways to spot this.
When you remove the first stream operation, the second will complete exceptionally as expected. Also, inserting the statement
ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.DAYS);
between the two stream operations will fix the problem, as it waits for the completion of all threads.
However, the cleaner solution would be to let both stream operations use their own counter and flag.
That said, there is a subtle, implementation dependent difference that causes the problem to disappear if you just swap the two operations. The IntStream.range
operation produces a stream with a known size, which allows splitting it into concurrent tasks which intrinsically know, how many elements to process. This allows abandoning these tasks in the exceptional case as described above. On the other hand, combining an infinite stream as returned by generate
with limit
does not produce a sized stream (though that would be possible). Since such a stream is treated as having an unknown size, the subtasks have to synchronize on a counter to ensure that the limit is obeyed. This causes the sub-tasks to (sometimes) complete, even in the exceptional case. But as said, that is a side effect of an implementation detail, not an intentional wait for completion. And since it’s about concurrency, the result might be different, if you run it multiple times.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With