I am reading java 8 in action and the author references this link: http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html
and writes his own stream forker that looks like this:
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Main {
public static void main(String... args) {
List<Person> people = Arrays.asList(new Person(23, "Paul"), new Person(24, "Nastya"), new Person(30, "Unknown"));
StreamForker<Person> forker = new StreamForker<>(people.stream())
.fork("All names", s -> s.map(Person::getName).collect(Collectors.joining(", ")))
.fork("Age stats", s -> s.collect(Collectors.summarizingInt(Person::getAge)))
.fork("Oldest", s -> s.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());
Results results = forker.getResults();
String allNames = results.get("All names");
IntSummaryStatistics stats = results.get("Age stats");
Person oldest = results.get("Oldest");
System.out.println(allNames);
System.out.println(stats);
System.out.println(oldest);
}
interface Results {
<R> R get(Object key);
}
static class StreamForker<T> {
private final Stream<T> stream;
private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();
public StreamForker(Stream<T> stream) {
this.stream = stream;
}
public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
forks.put(key, f);
return this;
}
public Results getResults() {
ForkingStreamConsumer<T> consumer = build();
try {
stream.sequential().forEach(consumer);
} finally {
consumer.finish();
}
return consumer;
}
private ForkingStreamConsumer<T> build() {
List<BlockingQueue<T>> queues = new ArrayList<>();
Map<Object, Future<?>> actions =
forks.entrySet().stream().reduce(
new HashMap<>(),
(map, e) -> {
map.put(e.getKey(),
getOperationResult(queues, e.getValue()));
return map;
},
(m1, m2) -> {
m1.putAll(m2);
return m1;
}
);
return new ForkingStreamConsumer<>(queues, actions);
}
private Future<?> getOperationResult(List<BlockingQueue<T>> queues,
Function<Stream<T>, ?> f) {
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
queues.add(queue);
Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);
Stream<T> source = StreamSupport.stream(spliterator, false);
return CompletableFuture.supplyAsync(() -> f.apply(source));
}
}
static class ForkingStreamConsumer<T> implements Results, Consumer<T> {
static final Object END_OF_STREAM = new Object();
private final List<BlockingQueue<T>> queues;
private final Map<Object, Future<?>> actions;
ForkingStreamConsumer(List<BlockingQueue<T>> queues,
Map<Object, Future<?>> actions) {
this.queues = queues;
this.actions = actions;
}
public void finish() {
accept((T) END_OF_STREAM);
}
@Override
public <R> R get(Object key) {
try {
return ((Future<R>) actions.get(key)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void accept(T t) {
queues.forEach(q -> q.add(t));
}
}
static class BlockingQueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> q;
public BlockingQueueSpliterator(BlockingQueue<T> q) {
this.q = q;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
T t;
while (true) {
try {
t = q.take();
break;
} catch (InterruptedException e) {
}
}
if (t != ForkingStreamConsumer.END_OF_STREAM) {
action.accept(t);
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
@Override
public int characteristics() {
return 0;
}
}
static class Person {
private int age;
private String name;
public Person(int age, String name) {
this.age = age;
this.name = name;
}
public int getAge() {
return age;
}
public String getName() {
return name;
}
@Override
public String toString() {
return String.format("Age: %d, name: %s", age, name);
}
}
}
How the code written by the author works:
First, we create a StreamForker out of a stream. Then we fork 3 operations, saying what we want to do on that stream in parallel. In our case, our data model is the Person{age, name} class and we want to perform 3 actions:
we then call the forker.getResults() method, that applies a StreamForkerConsumer to the stream, spreading its elements into 3 blocking queues, which are then turned into 3 streams and processed in parallel.
My question is, does this approach have any advantage over just doing this:
Future<String> allNames2 =
CompletableFuture.supplyAsync(() -> people.stream().map(Person::getName).collect(Collectors.joining(", ")));
Future<IntSummaryStatistics> stats2 =
CompletableFuture.supplyAsync(() -> people.stream().collect(Collectors.summarizingInt(Person::getAge)));
Future<Person> oldest2 =
CompletableFuture.supplyAsync(() -> people.stream().reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());
?
For me this doesn't make much sense with an array list as stream source.
If the stream source is a big file that you process with
StreamForker<Person> forker = new StreamForker<>(
java.nio.file.Files.lines(Paths.get("somepath"))
.map(Person::new))
.fork(...)
then it could prove beneficial since you would process the whole file only once, whereas with three seperat calls to Files.lines(...)
you would read the file three times.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With