Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding elements to Java 8 parallel Streams on-the-fly

The goal is to process a continuous stream of elements with the help of Java 8 streams. Therefore, elements are added to the data source of a parallel stream while processing that stream.

The Javadoc of Streams describes the following properties in section "Non-interference":

For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.

That is the reason a ConcurrentLinkedQueue is used in our attempts, which returns true for

new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)

It is not explicitly said, that the data source must not be modified when used in parallel streams.

In our example for each of the elements in the stream the incremented counter value is added to the queue, which is the data source of the stream, until the counter is bigger than N. With calling queue.stream() everything works fine while sequential execution:

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testSequential(N));
    }

    public static int testSequential(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.stream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            }
        });
        stream.close();
        return check.get();
    }
}

As a second attempt the stream is parallel and throws an java.lang.AssertionError because check is smaller than N and not every element in the queue was processed. The stream may have finished execution early because the queue may have gotten empty at some point in time.

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

public class StreamTest {
    public static void main(String[] args) {
        final int N = 10000;
        assertEquals(N, testParallel1(N));
    }

    public static int testParallel1(int N) {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.parallelStream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            }
        });
        stream.close();
        return check.get();
    }
}

Next attempt was to signal main thread, once the continuous stream ‘really’ ended (the queue is empty) and close the stream object afterwards. Here the problem is that the stream object appears to read elements from the queue only once or at least not continuously and never reaches the ‘real’ end of the stream.

import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

public class StreamTest {

    public static void main(String[] args) {
        final int N = 10000;
        try {
            assertEquals(N, testParallel2(N));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static int testParallel2(int N) throws InterruptedException {
        final Lock lock = new ReentrantLock();
        final Condition cond = lock.newCondition();

        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicInteger check = new AtomicInteger(0);
        final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

        for (int i = 0; i < N / 10; ++i) {
            queue.add(counter.incrementAndGet());
        }

        Stream<Integer> stream = queue.parallelStream();
        stream.forEach(i -> {
            System.out.println(i);

            int j = counter.incrementAndGet();

            lock.lock();
            check.incrementAndGet();
            if (j <= N) {
                queue.add(j);
            } else {
                cond.signal();
            }
            lock.unlock();
        });

        lock.lock();
        while (check.get() < N) {
            cond.await();
        }
        lock.unlock();
        stream.close();
        return check.get();
    }
}

The questions arising thereby are:

  • Did we do something wrong?
  • Is it an unspecified or even wrong usage of the Stream API?
  • How can we achieve the desired behavior otherwise?
like image 956
L.M. Hackl Avatar asked Oct 17 '16 11:10

L.M. Hackl


1 Answers

There is a significant difference between “modifying the source of the Stream does not break it” and your assumption “modifications will be reflected by the ongoing Stream operation”.

The CONCURRENT property implies that the modification of the source is permitted, i.e. that it will never throw a ConcurrentModificationException, but it does not imply that you can rely on a specific behavior regarding whether these changes are reflected or not.

The documentation of the CONCURRENT flag itself says:

Most concurrent collections maintain a consistency policy guaranteeing accuracy with respect to elements present at the point of Spliterator construction, but possibly not reflecting subsequent additions or removals.

This Stream behavior is consistent with the already known behavior of ConcurrentLinkedQueue:

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

It’s hard to say how to “achieve the desired behavior otherwise”, as as you didn’t describe the “desired behavior” in any form other than code, which can be simply replaced with

public static int testSequential(int N) {
    return N;
}
public static int testParallel1(int N) {
    return N;
}

as that’s the only observable effect… Consider redefining your problem…

like image 189
Holger Avatar answered Oct 06 '22 09:10

Holger