Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Getting ArrayIndexOutOfBoundsException when using parallel stream

I am ending up with occasional array index out of bounds exception when using the following code . Any leads ? The size of the array is always approximately around 29-30.

logger.info("devicetripmessageinfo size :{}",deviceMessageInfoList.size());
deviceMessageInfoList.parallelStream().forEach(msg->{
    if(msg!=null && msg.getMessageVO()!=null)
    {
        
        DeviceTripMessageInfo currentDevTripMsgInfo = 
                        (DeviceTripMessageInfo) msg.getMessageVO();
        if(currentDevTripMsgInfo.getValueMap()!=null)
        {mapsList.add(currentDevTripMsgInfo.getValueMap());}
    }
});

j

ava.lang.ArrayIndexOutOfBoundsException: null
        at java.base/jdk.internal.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at java.base/java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:603)
        at java.base/java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:678)
        at java.base/java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:737)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:661)
        at com.*.*.*.*.worker.*.process(*.java:96)
        at com.*.jms.consumer.JMSWorker.processList(JMSWorker.java:279)
        at com.*.jms.consumer.JMSWorker.process(JMSWorker.java:244)
        at com.*.jms.consumer.JMSWorker.processMessages(JMSWorker.java:200)
        at com.*.jms.consumer.JMSWorker.run(JMSWorker.java:136)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ArrayIndexOutOfBoundsException: null
like image 319
codemania23 Avatar asked Nov 07 '22 04:11

codemania23


1 Answers

Summary

The problem is that ArrayList is by design not safe for modification by multiple threads concurrently, but the parallel stream is writing to the list from multiple threads. A good solution is to switch to an idiomatic stream implementation:

List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List<Map<String, Object>>
                .filter(Objects::nonNull)
                .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .collect(Collectors.toUnmodifiableList());

Issue: concurrent modification

The ArrayList Javadocs explain the concurrent modification issue:

Note that this implementation is not synchronized. If multiple threads access an ArrayList instance concurrently, and at least one of the threads modifies the list structurally, it must be synchronized externally. This is typically accomplished by synchronizing on some object that naturally encapsulates the list. If no such object exists, the list should be "wrapped" using the Collections.synchronizedList method. This is best done at creation time, to prevent accidental unsynchronized access to the list

Note that the exception you're seeing is not the only incorrect behavior you might encounter. In my own tests of your code against large lists, the resulting list often contained only some of the elements from the source list.

Note that while switching from a parallel stream to a sequential stream would likely fix the issue in practice, it is dependent on the stream implementation, and not guaranteed by the API. Therefore, such an approach is highly inadvisable, as it could break in future versions of the library. Per the forEach Javadocs:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.

Issue: not idiomatic

Aside from the correctness issue, another issue with this approach is that it's not particularly idiomatic to use side effects within stream code. The stream documentation explicitly discourages them.

Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.

[...]

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators.

Of particular note, the documentation goes on to describe the exact scenario posted in this question as an inappropriate use of side-effects in a stream:

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism.

Aside: traditional non-stream solution

As an aside, this points to a solution one might use using traditional non-stream code. I will discuss it briefly, since it's helpful to understand traditional solutions to the issue of concurrent list modification. Traditionally, one might replace the ArrayList with either a wrapped syncnhronized version using Collections.synchronizedList or an inherently concurrent collection type such as ConcurrentLinkedQueue. Since these approaches are designed for concurrent insertion, they solve the parallel insert issue, though possibly with additional synchronization contention overhead.

Stream solution

The stream documentation continues on with a replacement for the inappropriate use of side effects:

Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!

Applying this approach to your code, you get:

List msgList = deviceMessageInfoList.parallelStream() // Declare generic type, e.g. List<Map<String, Object>>
                .filter(Objects::nonNull)
                .map(m -> (DeviceTripMessageInfo) m.getMessageVO())
                .filter(Objects::nonNull)
                .map(DeviceTripMessageInfo::getValueMap)
                .filter(Objects::nonNull)
                .collect(Collectors.toUnmodifiableList());
like image 69
M. Justin Avatar answered Nov 09 '22 13:11

M. Justin