Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using streams to find an object in a list of lists [duplicate]

I am trying to write a method that finds the indices of an object in a list of lists and takes advantage of parallelism. Here is my code.

// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
    return IntStream.range(0, lists.size())
                    .boxed()
                    .flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
                    .parallel()
                    .filter(a -> {
                        System.out.println(Arrays.toString(a));     // For testing only
                        return Objects.equals(o, lists.get(a[0]).get(a[1]));
                    })
                    .findAny()
                    .orElse(null);
}

When I run the following code

List<List<String>> lists = Arrays.asList(
        Arrays.asList("A", "B", "C"),
        Arrays.asList("D", "E", "F", "G"),
        Arrays.asList("H", "I"),
        Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));

the output is something like

[0, 0]
[0, 1]
[0, 2]
[3, 0]
[3, 1]
[3, 2]
[3, 3]
[2, 0]
[3, 4]
[1, 0]
[1, 1]
[2, 1]
[1, 2]
[1, 3]
Indices are [3, 0]

In other words, the search continues even after the object has been found. Isn't findAny supposed to be a short-circuiting operation? What am I missing? Also, what is the best way to take advantage of parallelism when iterating over a list of lists or a jagged array?

EDIT

Following the idea in @Sotirios's answer, I got an output of

Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 0]
Thread[main,5,main] [2, 0]
Thread[main,5,main] [2, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 0]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 2]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 3]
Thread[main,5,main] [0, 0]
Thread[main,5,main] [0, 1]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 1]
Thread[main,5,main] [0, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 3]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 4]
Indices are [3, 0]

Notice that

Thread[ForkJoinPool.commonPool-worker-3,5,main]

continues searching even after the answer is found.

like image 813
Paul Boddington Avatar asked Nov 21 '15 02:11

Paul Boddington


People also ask

How does stream detect duplicate values in a list?

Get the stream of elements in which the duplicates are to be found. For each element in the stream, count the frequency of each element, using Collections. frequency() method. Then for each element in the collection list, if the frequency of any element is more than one, then this element is a duplicate element.

How do you go through a list and search if there is a duplicate element in Java?

One of the most common ways to find duplicates is by using the brute force method, which compares each element of the array to every other element. This solution has the time complexity of O(n^2) and only exists for academic purposes.

How do you eliminate duplicate user defined objects from ArrayList?

You should override your equals() and hashCode() methods in your Event class and add all the objects in a Set rather than a List . A Set will not allow the duplicate objects, provided you have properly overridden the equals() and hashCode() .


3 Answers

Short-circuiting operations do not guarantee to only pull as few elements as it takes to produce their result. They may do so, but it is not required.

The current implementation of flatMap is such that it will always push the substream's entire contents downstream. So even if your stream weren't parallel, you could see more elements flow through the stream than it takes to satisfy findAny.

like image 84
Misha Avatar answered Oct 21 '22 01:10

Misha


As for "why it was implemented this way". The problem lies deeply in the Stream API implementation. The flatMap body often creates a stream with some intermediate operations (like .flatMap(list -> list.stream().map(...).filter(...))). One could use inside the flatMap implementation stream.spliterator() and call tryAdvance many times until the cancellation is requested. However the spliterator() call returns somewhat artificial spliterator when the stream contains intermediate operations (if not, it just returns the original stream spliterator). This artificial spliterator has not very efficient tryAdvance() implementation, so using this implementation might be considered as worse performance drawback compared to consuming the whole flatMapped stream. In many cases you flatMap to some short streams, so here you may have a performance gain thanks to the current implementation.

like image 26
Tagir Valeev Avatar answered Oct 21 '22 03:10

Tagir Valeev


It's not that it continues, it's that it has already dispatched all sorts of threads to try and find the result and will wait until those have completed before returning the result.

In other words, the findAny terminal operation will submit the "search" task to a number of threads. These tasks are simply applying the filter Predicate and returning when something returns true. findAny, presumably, waits for one of these to return a value. There's no way for it to really cancel anything it has already submitted and it seems this implementation will block until the entire batch returns. It can only stop submitting any future batches.

You can verify this by logging the current thread:

System.out.println(Thread.currentThread() + " " + Arrays.toString(a)); // For testing only
like image 26
Sotirios Delimanolis Avatar answered Oct 21 '22 02:10

Sotirios Delimanolis