Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the advantage of forking a stream over just using multiple streams?

I am reading java 8 in action and the author references this link: http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html

and writes his own stream forker that looks like this:

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Main {

    public static void main(String... args) {
        List<Person> people = Arrays.asList(new Person(23, "Paul"), new Person(24, "Nastya"), new Person(30, "Unknown"));
        StreamForker<Person> forker = new StreamForker<>(people.stream())
                .fork("All names", s -> s.map(Person::getName).collect(Collectors.joining(", ")))
                .fork("Age stats", s -> s.collect(Collectors.summarizingInt(Person::getAge)))
                .fork("Oldest", s -> s.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());
        Results results = forker.getResults();

        String allNames = results.get("All names");
        IntSummaryStatistics stats = results.get("Age stats");
        Person oldest = results.get("Oldest");

        System.out.println(allNames);
        System.out.println(stats);
        System.out.println(oldest);
    }

    interface Results {
        <R> R get(Object key);
    }

    static class StreamForker<T> {
        private final Stream<T> stream;
        private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap<>();

        public StreamForker(Stream<T> stream) {
            this.stream = stream;
        }

        public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
            forks.put(key, f);
            return this;
        }

        public Results getResults() {
            ForkingStreamConsumer<T> consumer = build();
            try {
                stream.sequential().forEach(consumer);
            } finally {
                consumer.finish();
            }
            return consumer;
        }

        private ForkingStreamConsumer<T> build() {
            List<BlockingQueue<T>> queues = new ArrayList<>();

            Map<Object, Future<?>> actions =
                    forks.entrySet().stream().reduce(
                            new HashMap<>(),
                            (map, e) -> {
                                map.put(e.getKey(),
                                        getOperationResult(queues, e.getValue()));
                                return map;
                            },
                            (m1, m2) -> {
                                m1.putAll(m2);
                                return m1;
                            }
                    );
            return new ForkingStreamConsumer<>(queues, actions);
        }

        private Future<?> getOperationResult(List<BlockingQueue<T>> queues,
                                             Function<Stream<T>, ?> f) {
            BlockingQueue<T> queue = new LinkedBlockingQueue<>();
            queues.add(queue);
            Spliterator<T> spliterator = new BlockingQueueSpliterator<>(queue);
            Stream<T> source = StreamSupport.stream(spliterator, false);
            return CompletableFuture.supplyAsync(() -> f.apply(source));
        }
    }

    static class ForkingStreamConsumer<T> implements Results, Consumer<T> {
        static final Object END_OF_STREAM = new Object();
        private final List<BlockingQueue<T>> queues;
        private final Map<Object, Future<?>> actions;

        ForkingStreamConsumer(List<BlockingQueue<T>> queues,
                              Map<Object, Future<?>> actions) {
            this.queues = queues;
            this.actions = actions;
        }

        public void finish() {
            accept((T) END_OF_STREAM);
        }

        @Override
        public <R> R get(Object key) {
            try {
                return ((Future<R>) actions.get(key)).get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void accept(T t) {
            queues.forEach(q -> q.add(t));
        }
    }

    static class BlockingQueueSpliterator<T> implements Spliterator<T> {

        private final BlockingQueue<T> q;

        public BlockingQueueSpliterator(BlockingQueue<T> q) {
            this.q = q;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T t;
            while (true) {
                try {
                    t = q.take();
                    break;
                } catch (InterruptedException e) {
                }
            }

            if (t != ForkingStreamConsumer.END_OF_STREAM) {
                action.accept(t);
                return true;
            }
            return false;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return 0;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }

    static class Person {
        private int age;
        private String name;

        public Person(int age, String name) {
            this.age = age;
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public String getName() {
            return name;
        }

        @Override
        public String toString() {
            return String.format("Age: %d, name: %s", age, name);
        }
    }
}

How the code written by the author works:

First, we create a StreamForker out of a stream. Then we fork 3 operations, saying what we want to do on that stream in parallel. In our case, our data model is the Person{age, name} class and we want to perform 3 actions:

  • Get a string of all names
  • Get age statistics
  • Get the oldest person

we then call the forker.getResults() method, that applies a StreamForkerConsumer to the stream, spreading its elements into 3 blocking queues, which are then turned into 3 streams and processed in parallel.

My question is, does this approach have any advantage over just doing this:

Future<String> allNames2 =
                CompletableFuture.supplyAsync(() -> people.stream().map(Person::getName).collect(Collectors.joining(", ")));
Future<IntSummaryStatistics> stats2 =
                CompletableFuture.supplyAsync(() -> people.stream().collect(Collectors.summarizingInt(Person::getAge)));
Future<Person> oldest2 =
                CompletableFuture.supplyAsync(() -> people.stream().reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2).get());

?

like image 257
Coder-Man Avatar asked Jun 17 '18 13:06

Coder-Man


1 Answers

For me this doesn't make much sense with an array list as stream source.

If the stream source is a big file that you process with

StreamForker<Person> forker = new StreamForker<>(
    java.nio.file.Files.lines(Paths.get("somepath"))
        .map(Person::new))
    .fork(...)

then it could prove beneficial since you would process the whole file only once, whereas with three seperat calls to Files.lines(...) you would read the file three times.

like image 108
Thomas Kläger Avatar answered Oct 22 '22 13:10

Thomas Kläger