Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Inconsistent exception details in parallel stream

Most of the time, exceptions thrown in a parallel stream won't have all of its attributes.

Ex:

@Test
public void test() {
    assertThatThrownBy(() -> Stream.of("1", "2", "asdf").parallel().forEach(Integer::parseInt))
            .asInstanceOf(InstanceOfAssertFactories.type(NumberFormatException.class))
            .extracting("detailMessage")
            .isEqualTo("For input string: \"asdf\"");
}

This fails most of the time, while this:

@Test
public void test() {
    assertThatThrownBy(() -> Stream.of("1", "2", "asdf").forEach(Integer::parseInt))
            .asInstanceOf(InstanceOfAssertFactories.type(NumberFormatException.class))
            .extracting("detailMessage")
            .isEqualTo("For input string: \"asdf\"");
}

succeeds 100% of the time.

Another thing is when the message is absent, it will be present in the cause of the exception. Ex:

@Test
public void test() {
    assertThatThrownBy(() -> Stream.of("1", "2", "asdf").parallel().forEach(Integer::parseInt))
            .asInstanceOf(InstanceOfAssertFactories.type(NumberFormatException.class))
            .extracting("cause.detailMessage")
            .isEqualTo("For input string: \"asdf\"");
}

Any idea of how to make the parallel stream throw the exact and exception and not some sort of nested monster?

like image 258
dlhextall Avatar asked Nov 28 '19 16:11

dlhextall


1 Answers

Execution of parallel stream consists of a set of tasks that are divided among several threads, some of which come from a fork-join pool (typically the fork-join common pool), and also including the calling thread. The distribution of tasks to threads is non-deterministic, so a given task (like calling parseInt on "asdf") might be executed on some thread from a thread pool, or it might be executed on the calling thread. You don't have any control over which thread executes any given task. This particular task throws an exception, so the question is how exceptions are handled when they occur on the different threads.

If the task is executed (and the exception thrown) on the calling thread, the other tasks are canceled, and the exception is rethrown to the caller. If the task is executed on a thread pool thread, the exception is caught, the other tasks are canceled, and the exception is wrapped within a new exception (of the same type, if possible) which is then thrown from the calling thread. The code that implements this has a comment that describes what it does:

/**
 * Returns a rethrowable exception for the given task, if
 * available. To provide accurate stack traces, if the exception
 * was not thrown by the current thread, we try to create a new
 * exception of the same type as the one thrown, but with the
 * recorded exception as its cause. If there is no such
 * constructor, we instead try to use a no-arg constructor,
 * followed by initCause, to the same effect. If none of these
 * apply, or any fail due to other exceptions, we return the
 * recorded exception, which is still correct, although it may
 * contain a misleading stack trace.
 *
 * @return the exception, or null if none
 */
private Throwable getThrowableException() { ... }

The reason this wrapping is done is to preserve information about where the exception was caught, tracing back to the calling code. If the task that threw the exception is directly on the calling thread, the stack trace includes frames from the actual task execution all the way back to the caller. If the task that threw the exception is a thread pool thread, the stack trace for that exception terminates in the fork-join framework. The wrapped exception provides the additional frames that lead back to the caller. If the wrapping weren't done, the stack trace from a thread pool thread would be incomplete, and it might be quite difficult to determine the root cause of the exception.

Here's an example of a stack trace from an exception that occurred on the calling thread:

Exception in thread "main" java.lang.NumberFormatException: For input string: "asdf"
    at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.base/java.lang.Integer.parseInt(Integer.java:652)
    at java.base/java.lang.Integer.parseInt(Integer.java:770)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.helpCC(ForkJoinPool.java:1115)
    at java.base/java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:1957)
    at java.base/java.util.concurrent.ForkJoinTask.tryExternalHelp(ForkJoinTask.java:378)
    at java.base/java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:323)
    at java.base/java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:412)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:736)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
    at ParallelStreamExceptions.main(ParallelStreamExceptions.java:31)

And here's an example of a stack trace from an exception that occurred on a thread pool thread:

Exception in thread "main" java.lang.NumberFormatException
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
    at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
    at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
    at ParallelStreamExceptions.main(ParallelStreamExceptions.java:31)
Caused by: java.lang.NumberFormatException: For input string: "asdf"
    at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.base/java.lang.Integer.parseInt(Integer.java:652)
    at java.base/java.lang.Integer.parseInt(Integer.java:770)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:746)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Note that the only application code that's visible in these stack traces is the method ParallelStreamExceptions.main; the rest of it is library code. This is reasonably straightforward if the exception is on the calling thread. But consider if the original exception from a thread pool thread were simply rethrown in the calling thread, without wrapping. This could be "misleading", as the comment says, because it does not contain a stack frame from the application at all! Thus, wrapping the exception provides the missing context.

Now, what to do about the unit test? There are a few alternatives.

One is simply to check the exception type and not its detail message. In this example, checking for a NumberFormatException should work regardless of which thread throws the exception.

Two, if you really want to check the detail message, you can write a custom assertion for this. There's probably an idiomatic AssertJ way to write this, but the logic would be something like "assert that the caught exception is a NumberFormatException with the expected detail message, OR that the caught exception has a cause that is a NumberFormatException with the expected detail message."

Third, you might want to reconsider what you're testing here. The work being performed by the stream is the parsing of each stream element into an int. The example uses Integer::parseInt but I'll assume that this is a stand-in for some application code that performs some sophisticated work. The point of a unit test should be to test that application code against a variety of inputs, not to test the streams execution framework.

like image 122
Stuart Marks Avatar answered Sep 22 '22 07:09

Stuart Marks