I want to have something similar to Collectors.maxBy()
, a collector that gets the top elements in a collection (maxBy
only gets one).
I have a stream of Possibility
objects that could be scored with an Integer score(Possibility)
method.
First I tried:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(Collectors.toList());
if(!possibilities.isEmpty()) {
int bestScore = possibilities.stream()
.mapToInt(p -> score(p))
.max()
.getAsInt();
possibilities = possibilities.stream()
.filter(p -> score(p)==bestScore)
.collect(Collectors.toList());
}
But doing that, I scan the collection three times. Once to build it, a second time to get the top score, and a third time to filter it and that is not optimal. Moreover the number of possibiities could be huge (>1012).
The best way should be to directly get the top possibilities in the first collect but there seems to be no built in collector to do such a thing.
So I implemented my own Collector
:
public class BestCollector<E> implements Collector<E, List<E>, List<E>> {
private final Comparator<E> comparator;
private final Class<? extends List> listImpl ;
public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
this.comparator = comparator;
this.listImpl = listImpl;
}
public BestCollector(Comparator<E> comparator) {
this.comparator= comparator;
listImpl = ArrayList.class;
}
@Override
public Supplier<List<E>> supplier() {
return () -> {
try {
return listImpl.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
};
}
@Override
public BiConsumer<List<E>, E> accumulator() {
return (list, e) -> {
if (list.isEmpty()) {
list.add(e);
} else {
final int comparison = comparator.compare(list.get(0), e);
if (comparison == 0) {
list.add(e);
} else if (comparison < 0) {
list.clear();
list.add(e);
}
}
};
}
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
};
}
@Override
public Function<List<E>, List<E>> finisher() {
return Function.identity();
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
}
}
And then:
List<Possibity> possibilities = getPossibilityStream()
.parallel()
.collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));
And that does the job in a sequential mode (without the .parallel()
) but in parallel mode there are some exceptions occasionally in two spots:
A java.lang.IndexOutOfBoundsException Index: 0, Size: 0
in the line:
final int comparison = comparator.compare(list.get(0), e);
of the accumulator()
method
I understand it happens when a list.clear()
is called between list.isEmpty()
and list.get(0)
.
A java.lang.NullPointerException
in the score(Possibility) method because the possibility is null
. Again the same line is involved:
final int comparison = comparator.compare(list.get(0), e);
I don't understand how list.get(0)
could return null
...
In parallel mode, sometimes list.get(0)
raises a IndexOutOfBoundsException
and sometimes return null
.
I understand that my code is not thread safe so I tried several solutions:
synchronized
in all methods of BestCollector: public synchronized …
ArrayList
: java.util.concurrent.CopyOnWriteArrayList
synchronized
and use CopyOnWriteArrayList
at the same timeRemove Characteristics.CONCURRENT
out of the Set<Characteristics>
of the characteristics()
method
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
But I don't know if the Characteristics.CONCURRENT
is here to indicate that my code is thread safe or that my code will be used in a concurrency processing.
But none of these solutions actually solve the problem.
In fact when I remove CONCURRENT out of the characteristics there is, sometimes, a java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
but in the line:
final int comparison = comparator.compare(l1.get(0), l2.get(0));
of the combiner()
method.
However, the exceptions raised by the accumulator()
method seem to not occur anymore.
@Holger's answer is right.
The complete solution is to change both combiner()
and characteristics()
methods:
@Override
public BinaryOperator<List<E>> combiner() {
return (l1, l2) -> {
if (l1.isEmpty()) {
return l2;
} else if (l2.isEmpty()) {
return l1;
} else {
final int comparison = comparator.compare(l1.get(0), l2.get(0));
if (comparison == 0) {
l1.addAll(l2);
return l1;
} else if (comparison < 0) {
return l2;
} else {
return l1;
}
}
};
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
To make an ArrayList thread-safe we can use the synchronizedList() method. Let's see how this method works internally. The Collections class contains a static inner class called SynchronizedList. The synchronizedList() method is called when the object of this class is returned.
The collection classes that are thread-safe in Java are Stack, Vector, Properties, Hashtable, etc.
As you may know, synchronization requires locks which always take time to monitor, and that reduces the performance. That's why the new collections (List, Set, Map, etc) provide no concurrency control at all to provide maximum performance in single-threaded applications.
Implement a thread-safe linked list that has the following methods. ThreadSafeLinkedList() constructor to initialize the linked list. void append_left(int element) ( void appendLeft(int element) in Java ) adds an element to the head of the linked list.
Your code has only one significant error: if your collector is not thread safe, it shouldn’t report Characteristics.CONCURRENT
as that is exactly claiming that it was thread safe.
The important point you have to understand is that for non-CONCURRENT
collectors, the framework will perform the necessary steps to use it in a thread-safe but still efficient manner:
supplier()
accumulator()
function together with its own local containercombiner()
will be used once two worker thread have finished their workfinisher()
will be used when all worker threads have finished their work and all containers have been combinedSo all you have to do is to ensure that your supplier truly returns a new instance on each invocation and that all functions are non-interfering and side-effect free (regarding anything else but the container they receive as arguments) and, of course, not report Characteristics.CONCURRENT
when your collector isn’t a concurrent collector.
You don’t need the synchronized
keyword nor concurrent collections here.
By the way, a Comparator
of the form (p1, p2) -> score(p1).compareTo(score(p2))
can be implemented using Comparator.comparing(p -> score(p))
or if the score value is an int
: Comparator.comparingInt(p -> score(p))
.
Finally, your combiner function does not check whether one of the lists is empty. This perfectly explains an IndexOutOfBoundsException
within the combiner
while the IndexOutOfBoundsException
within the accumulator
is the result of your collector reporting Characteristics.CONCURRENT
…
It’s also important to understand that adding a synchronized
keyword to an accumulator()
or combiner()
method does not guard the function constructed via lambda expression. It will guard the method which constructs the function instance, but not the function’s code itself. In contrast to an inner class, there is no way the add a synchronized
keyword to the actual function’s implementation method.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With