Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a thread-safe Collector?

I want to have something similar to Collectors.maxBy(), a collector that gets the top elements in a collection (maxBy only gets one).

I have a stream of Possibility objects that could be scored with an Integer score(Possibility) method.

First I tried:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(Collectors.toList());

if(!possibilities.isEmpty()) {
    int bestScore = possibilities.stream()
        .mapToInt(p -> score(p))
        .max()
        .getAsInt();
    possibilities = possibilities.stream()
        .filter(p -> score(p)==bestScore)
        .collect(Collectors.toList());
}

But doing that, I scan the collection three times. Once to build it, a second time to get the top score, and a third time to filter it and that is not optimal. Moreover the number of possibiities could be huge (>1012).

The best way should be to directly get the top possibilities in the first collect but there seems to be no built in collector to do such a thing.

So I implemented my own Collector:

public class BestCollector<E> implements Collector<E, List<E>, List<E>> {

    private final Comparator<E> comparator;

    private final Class<? extends List> listImpl ;

    public BestCollector(Comparator<E> comparator, Class<? extends List> listImpl) {
        this.comparator = comparator;
        this.listImpl = listImpl;
    }

    public BestCollector(Comparator<E> comparator) {
        this.comparator= comparator;
        listImpl = ArrayList.class;
    }

    @Override
    public Supplier<List<E>> supplier() {
        return () -> {
            try {
                return listImpl.newInstance();
            } catch (InstantiationException | IllegalAccessException ex) {
                throw new RuntimeException(ex);
            }
        };
    }

    @Override
    public BiConsumer<List<E>, E> accumulator() {
        return (list, e) -> {
            if (list.isEmpty()) {
                list.add(e);
            } else {
                final int comparison = comparator.compare(list.get(0), e);
                if (comparison == 0) {
                    list.add(e);
                } else if (comparison < 0) {
                    list.clear();
                    list.add(e);
                }
            }
        };
    }

    @Override
    public BinaryOperator<List<E>> combiner() {
        return (l1, l2) -> {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        };
    }

    @Override
    public Function<List<E>, List<E>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT, Characteristics.UNORDERED);
    }
}

And then:

List<Possibity> possibilities = getPossibilityStream()
    .parallel()
    .collect(new BestCollector<Possibility>((p1, p2) -> score(p1).compareTo(score(p2)));

And that does the job in a sequential mode (without the .parallel()) but in parallel mode there are some exceptions occasionally in two spots:

  • A java.lang.IndexOutOfBoundsException Index: 0, Size: 0 in the line:

    final int comparison = comparator.compare(list.get(0), e);
    

of the accumulator() method

I understand it happens when a list.clear() is called between list.isEmpty() and list.get(0).

  • A java.lang.NullPointerException in the score(Possibility) method because the possibility is null. Again the same line is involved:

    final int comparison = comparator.compare(list.get(0), e);
    

I don't understand how list.get(0) could return null...

In parallel mode, sometimes list.get(0) raises a IndexOutOfBoundsException and sometimes return null.

I understand that my code is not thread safe so I tried several solutions:

  • Add synchronized in all methods of BestCollector: public synchronized …
  • Use a thread-safe collection instead of ArrayList: java.util.concurrent.CopyOnWriteArrayList
  • Add synchronized and use CopyOnWriteArrayList at the same time
  • Remove Characteristics.CONCURRENT out of the Set<Characteristics> of the characteristics() method

    @Override
    public Set<Characteristics> characteristics() {
        return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
    }
    

But I don't know if the Characteristics.CONCURRENT is here to indicate that my code is thread safe or that my code will be used in a concurrency processing.

But none of these solutions actually solve the problem.


In fact when I remove CONCURRENT out of the characteristics there is, sometimes, a java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 but in the line:

final int comparison = comparator.compare(l1.get(0), l2.get(0));

of the combiner() method.

However, the exceptions raised by the accumulator() method seem to not occur anymore.


@Holger's answer is right.

The complete solution is to change both combiner() and characteristics() methods:

@Override
public BinaryOperator<List<E>> combiner() {
    return (l1, l2) -> {
        if (l1.isEmpty()) {
            return l2;
        } else if (l2.isEmpty()) {
            return l1;
        } else {
            final int comparison = comparator.compare(l1.get(0), l2.get(0));
            if (comparison == 0) {
                l1.addAll(l2);
                return l1;
            } else if (comparison < 0) {
                return l2;
            } else {
                return l1;
            }
        }
    };
}

@Override
public Set<Characteristics> characteristics() {
    return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
}
like image 958
kwisatz Avatar asked Apr 28 '15 10:04

kwisatz


People also ask

How can we make a collection thread-safe?

To make an ArrayList thread-safe we can use the synchronizedList() method. Let's see how this method works internally. The Collections class contains a static inner class called SynchronizedList. The synchronizedList() method is called when the object of this class is returned.

Which of the following implementations of collection are thread-safe?

The collection classes that are thread-safe in Java are Stack, Vector, Properties, Hashtable, etc.

Why are collections not thread-safe?

As you may know, synchronization requires locks which always take time to monitor, and that reduces the performance. That's why the new collections (List, Set, Map, etc) provide no concurrency control at all to provide maximum performance in single-threaded applications.

How do you guarantee thread safety in your linked list implementation?

Implement a thread-safe linked list that has the following methods. ThreadSafeLinkedList() constructor to initialize the linked list. void append_left(int element) ( void appendLeft(int element) in Java ) adds an element to the head of the linked list.


1 Answers

Your code has only one significant error: if your collector is not thread safe, it shouldn’t report Characteristics.CONCURRENT as that is exactly claiming that it was thread safe.

The important point you have to understand is that for non-CONCURRENT collectors, the framework will perform the necessary steps to use it in a thread-safe but still efficient manner:

  • for each worker thread, a new container will be acquired via the supplier()
  • each worker will use the accumulator() function together with its own local container
  • the combiner() will be used once two worker thread have finished their work
  • the finisher() will be used when all worker threads have finished their work and all containers have been combined

So all you have to do is to ensure that your supplier truly returns a new instance on each invocation and that all functions are non-interfering and side-effect free (regarding anything else but the container they receive as arguments) and, of course, not report Characteristics.CONCURRENT when your collector isn’t a concurrent collector.

You don’t need the synchronized keyword nor concurrent collections here.


By the way, a Comparator of the form (p1, p2) -> score(p1).compareTo(score(p2)) can be implemented using Comparator.comparing(p -> score(p)) or if the score value is an int: Comparator.comparingInt(p -> score(p)).


Finally, your combiner function does not check whether one of the lists is empty. This perfectly explains an IndexOutOfBoundsException within the combiner while the IndexOutOfBoundsException within the accumulator is the result of your collector reporting Characteristics.CONCURRENT


It’s also important to understand that adding a synchronized keyword to an accumulator() or combiner() method does not guard the function constructed via lambda expression. It will guard the method which constructs the function instance, but not the function’s code itself. In contrast to an inner class, there is no way the add a synchronized keyword to the actual function’s implementation method.

like image 167
Holger Avatar answered Oct 19 '22 15:10

Holger