I am trying to write a method that finds the indices of an object in a list of lists and takes advantage of parallelism. Here is my code.
// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
return IntStream.range(0, lists.size())
.boxed()
.flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
.parallel()
.filter(a -> {
System.out.println(Arrays.toString(a)); // For testing only
return Objects.equals(o, lists.get(a[0]).get(a[1]));
})
.findAny()
.orElse(null);
}
When I run the following code
List<List<String>> lists = Arrays.asList(
Arrays.asList("A", "B", "C"),
Arrays.asList("D", "E", "F", "G"),
Arrays.asList("H", "I"),
Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));
the output is something like
[0, 0]
[0, 1]
[0, 2]
[3, 0]
[3, 1]
[3, 2]
[3, 3]
[2, 0]
[3, 4]
[1, 0]
[1, 1]
[2, 1]
[1, 2]
[1, 3]
Indices are [3, 0]
In other words, the search continues even after the object has been found. Isn't findAny
supposed to be a short-circuiting operation? What am I missing? Also, what is the best way to take advantage of parallelism when iterating over a list of lists or a jagged array?
EDIT
Following the idea in @Sotirios's answer, I got an output of
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 0]
Thread[main,5,main] [2, 0]
Thread[main,5,main] [2, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 0]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 2]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 3]
Thread[main,5,main] [0, 0]
Thread[main,5,main] [0, 1]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 1]
Thread[main,5,main] [0, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 3]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 4]
Indices are [3, 0]
Notice that
Thread[ForkJoinPool.commonPool-worker-3,5,main]
continues searching even after the answer is found.
Get the stream of elements in which the duplicates are to be found. For each element in the stream, count the frequency of each element, using Collections. frequency() method. Then for each element in the collection list, if the frequency of any element is more than one, then this element is a duplicate element.
One of the most common ways to find duplicates is by using the brute force method, which compares each element of the array to every other element. This solution has the time complexity of O(n^2) and only exists for academic purposes.
You should override your equals() and hashCode() methods in your Event class and add all the objects in a Set rather than a List . A Set will not allow the duplicate objects, provided you have properly overridden the equals() and hashCode() .
Short-circuiting operations do not guarantee to only pull as few elements as it takes to produce their result. They may do so, but it is not required.
The current implementation of flatMap
is such that it will always push the substream's entire contents downstream. So even if your stream weren't parallel, you could see more elements flow through the stream than it takes to satisfy findAny
.
As for "why it was implemented this way". The problem lies deeply in the Stream API implementation. The flatMap
body often creates a stream with some intermediate operations (like .flatMap(list -> list.stream().map(...).filter(...))
). One could use inside the flatMap
implementation stream.spliterator()
and call tryAdvance
many times until the cancellation is requested. However the spliterator()
call returns somewhat artificial spliterator when the stream contains intermediate operations (if not, it just returns the original stream spliterator). This artificial spliterator has not very efficient tryAdvance() implementation, so using this implementation might be considered as worse performance drawback compared to consuming the whole flatMapped stream. In many cases you flatMap to some short streams, so here you may have a performance gain thanks to the current implementation.
It's not that it continues, it's that it has already dispatched all sorts of threads to try and find the result and will wait until those have completed before returning the result.
In other words, the findAny
terminal operation will submit the "search" task to a number of threads. These tasks are simply applying the filter
Predicate
and returning when something returns true
. findAny
, presumably, waits for one of these to return a value. There's no way for it to really cancel anything it has already submitted and it seems this implementation will block until the entire batch returns. It can only stop submitting any future batches.
You can verify this by logging the current thread:
System.out.println(Thread.currentThread() + " " + Arrays.toString(a)); // For testing only
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With