Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do parallel streams treat upstream iterators in a thread safe way?

Today I was using a stream that was performing a parallel() operation after a map, however; the underlying source is an iterator which is not thread safe which is similar to the BufferedReader.lines implementation.

I originally thought that trySplit would be called on the created thread, however; I observed that the accesses to the iterator have come from multiple threads.

By example, the following silly iterator implementation is just setup with enough elements to cause splitting and also keeps track of the unique threads that accessed the hasNext method.

class SillyIterator implements Iterator<String> {

    private final ArrayDeque<String> src =
        IntStream.range(1, 10000)
            .mapToObj(Integer::toString)
            .collect(toCollection(ArrayDeque::new));
    private Map<String, String> ts = new ConcurrentHashMap<>();
    public Set<String> threads() { return ts.keySet(); }
    private String nextRecord = null;

    @Override
    public boolean hasNext() {
        var n = Thread.currentThread().getName();
        ts.put(n, n);
        if (nextRecord != null) {
            return true;
        } else {
            nextRecord = src.poll();
            return nextRecord != null;
        }
    }
    @Override
    public String next() {
        if (nextRecord != null || hasNext()) {
            var rec = nextRecord;
            nextRecord = null;
            return rec;
        }
        throw new NoSuchElementException();
    }

}

Using this to create a stream as follows:

var iter = new SillyIterator();
StreamSupport
    .stream(Spliterators.spliteratorUnknownSize(
        iter, Spliterator.ORDERED | Spliterator.NONNULL
    ), false)
    .map(n -> "value = " + n)
    .parallel()
    .collect(toList());

System.out.println(iter.threads());

This on my system output the two fork join threads as well as the main thread, which kind of scared me.

[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
like image 937
Brett Ryan Avatar asked Dec 12 '21 12:12

Brett Ryan


People also ask

Are parallel stream thread safe?

Parallel streams provide the capability of parallel processing over collections that are not thread-safe. It is although required that one does not modify the collection during the parallel processing.

Is parallel stream multithreading?

In case of Parallel stream,4 threads are spawned simultaneously and it internally using Fork and Join pool to create and manage threads.

Are streams thread safe?

All access to the Stream object will be thread safe.


1 Answers

Thread safety does not necessarily imply being accessed by only one thread. The important aspect is that there is no concurrent access, i.e. no access by more than one thread at the same time. If the access by different threads is temporally ordered and this ordering also ensures the necessary memory visibility, which is the responsibility of the caller, it still is a thread safe usage.

The Spliterator documentation says:

Despite their obvious utility in parallel algorithms, spliterators are not expected to be thread-safe; instead, implementations of parallel algorithms using spliterators should ensure that the spliterator is only used by one thread at a time. This is generally easy to attain via serial thread-confinement, which often is a natural consequence of typical parallel algorithms that work by recursive decomposition.

The spliterator doesn’t need to be confined to the same thread throughout its lifetime, but there should be a clear handover at the caller’s side ensuring that the old thread stops using it before the new thread starts using it.

But the important takeaway is, the spliterator doesn’t need to be thread safe, hence, the iterator wrapped by a spliterator also doesn’t need to be thread safe.

Note that a typical behavior is splitting and handing over before starting traversal, but since an ordinary Iterator doesn’t support splitting, the wrapping spliterator has to iterate and buffer elements to implement splitting. Therefore, the Iterator experiences traversal by different threads (but one at a time) when the traversal has not been started from the Stream implementation’s perspective.


That said, the lines() implementation of BufferedReader is a bad example which you should not follow. Since it’s centered around a single readLine() call, it would be natural to implement Spliterator directly instead of implementing a more complicated Iterator and have it wrapped via spliteratorUnknownSize(…).

Since your example is likewise centered around a single poll() call, it’s also straight-forward to implement Spliterator directly:

class SillySpliterator extends Spliterators.AbstractSpliterator<String> {
    private final ArrayDeque<String> src = IntStream.range(1, 10000)
        .mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));

    SillySpliterator() {
        super(Long.MAX_VALUE, ORDERED | NONNULL);
    }

    @Override
    public boolean tryAdvance(Consumer<? super String> action) {
        String nextRecord = src.poll();
        if(nextRecord == null) return false;
        action.accept(nextRecord);
        return true;
    }
}

Depending on your real life case, you may also pass the actual deque size to the constructor and provide the SIZED characteristic.

Then, you may use it like

var result = StreamSupport.stream(new SillySpliterator(), true)
    .map(n -> "value = " + n)
    .collect(toList());
like image 146
Holger Avatar answered Oct 23 '22 14:10

Holger