The goal is to process a continuous stream of elements with the help of Java 8 streams. Therefore, elements are added to the data source of a parallel stream while processing that stream.
The Javadoc of Streams describes the following properties in section "Non-interference":
For most data sources, preventing interference means ensuring that the data source is not modified at all during the execution of the stream pipeline. The notable exception to this are streams whose sources are concurrent collections, which are specifically designed to handle concurrent modification. Concurrent stream sources are those whose Spliterator reports the CONCURRENT characteristic.
That is the reason a ConcurrentLinkedQueue is used in our attempts, which returns true for
new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
It is not explicitly said, that the data source must not be modified when used in parallel streams.
In our example for each of the elements in the stream the incremented counter value is added to the queue, which is the data source of the stream, until the counter is bigger than N. With calling queue.stream() everything works fine while sequential execution:
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testSequential(N));
}
public static int testSequential(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.stream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
As a second attempt the stream is parallel and throws an java.lang.AssertionError because check is smaller than N and not every element in the queue was processed. The stream may have finished execution early because the queue may have gotten empty at some point in time.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testParallel1(N));
}
public static int testParallel1(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
Next attempt was to signal main thread, once the continuous stream ‘really’ ended (the queue is empty) and close the stream object afterwards. Here the problem is that the stream object appears to read elements from the queue only once or at least not continuously and never reaches the ‘real’ end of the stream.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
try {
assertEquals(N, testParallel2(N));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int testParallel2(int N) throws InterruptedException {
final Lock lock = new ReentrantLock();
final Condition cond = lock.newCondition();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N / 10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
lock.lock();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
} else {
cond.signal();
}
lock.unlock();
});
lock.lock();
while (check.get() < N) {
cond.await();
}
lock.unlock();
stream.close();
return check.get();
}
}
The questions arising thereby are:
There is a significant difference between “modifying the source of the Stream
does not break it” and your assumption “modifications will be reflected by the ongoing Stream
operation”.
The CONCURRENT
property implies that the modification of the source is permitted, i.e. that it will never throw a ConcurrentModificationException
, but it does not imply that you can rely on a specific behavior regarding whether these changes are reflected or not.
The documentation of the CONCURRENT
flag itself says:
Most concurrent collections maintain a consistency policy guaranteeing accuracy with respect to elements present at the point of Spliterator construction, but possibly not reflecting subsequent additions or removals.
This Stream behavior is consistent with the already known behavior of ConcurrentLinkedQueue
:
Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw
ConcurrentModificationException
, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.
It’s hard to say how to “achieve the desired behavior otherwise”, as as you didn’t describe the “desired behavior” in any form other than code, which can be simply replaced with
public static int testSequential(int N) {
return N;
}
public static int testParallel1(int N) {
return N;
}
as that’s the only observable effect… Consider redefining your problem…
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With