Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java 8 One Stream To Multiple Map

Lets say I have huge webserver log file that does not fit in memory. I need to stream this file to a mapreduce method and save to database. I do this using Java 8 stream api. For example, I get a list after the mapreduce process such as, consumption by client, consumption by ip, consumption by content. But, my needs are not that like that given in my example. Since I cannot share code, I just want to give basic example.

By Java 8 Stream Api, I want to read file exactly once, get 3 lists at the same time, while I am streaming file, parallel or sequential. But parallel would be good. Is there any way to do that?

like image 277
Yılmaz Avatar asked Aug 13 '18 08:08

Yılmaz


People also ask

Can we use multiple map in Stream?

First, we need to combine all maps into a unified Stream. There are multiple ways of doing this but my preferred one is using Stream. concat() and passing the entry sets of all maps as parameters: Stream.

How do I add multiple maps to a list?

Map<String, List<String>> map = new HashMap<>(); map. computeIfAbsent("key1", k -> new ArrayList<>()). add("value1"); map. computeIfAbsent("key1", k -> new ArrayList<>()).

Can we use map Stream in Java 8?

Java 8 Stream's map method is intermediate operation and consumes single element forom input Stream and produces single element to output Stream. It simply used to convert Stream of one type to another. Let's see method signature of Stream's map method.

How will you convert a list into map using streams in Java 8?

With Java 8, you can convert a List to Map in one line using the stream() and Collectors. toMap() utility methods. The Collectors. toMap() method collects a stream as a Map and uses its arguments to decide what key/value to use.


2 Answers

Generally collecting to anything other than standard API's gives you is pretty easy via a custom Collector. In your case collecting to 3 lists at a time (just a small example that compiles, since you can't share your code either):

private static <T> Collector<T, ?, List<List<T>>> to3Lists() {
    class Acc {

        List<T> left = new ArrayList<>();

        List<T> middle = new ArrayList<>();

        List<T> right = new ArrayList<>();

        List<List<T>> list = Arrays.asList(left, middle, right);

        void add(T elem) {
            // obviously do whatever you want here
            left.add(elem);
            middle.add(elem);
            right.add(elem);
        }

        Acc merge(Acc other) {

            left.addAll(other.left);
            middle.addAll(other.middle);
            right.addAll(other.right);

            return this;
        }

        public List<List<T>> finisher() {
            return list;
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

And using it via:

Stream.of(1, 2, 3)
      .collect(to3Lists());

Obviously this custom collector does not do anything useful, but just an example of how you could work with it.

like image 161
Eugene Avatar answered Oct 02 '22 12:10

Eugene


I have adapted the answer to this question to your case. The custom Spliterator will "split" the stream into multiple streams that collect by different properties:

@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

public static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>         sourceSpliterator;

    private List<BlockingQueue<T>> queues = new ArrayList<>();

    private boolean                sourceDone;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();

        for (Consumer<Stream<T>> fork : consumers)
        {
            LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
            queues.add(queue);
            new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private BlockingQueue<T> queue;

        private ForkedConsumer(BlockingQueue<T> queue)
        {
            super(Long.MAX_VALUE, 0);
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            while (queue.peek() == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(queue.poll());

            return true;
        }
    }
}

You can use it as follows:

streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
                       new Row("content2", "client1", "location1", 2),
                       new Row("content1", "client1", "location2", 3),
                       new Row("content2", "client2", "location2", 4),
                       new Row("content1", "client2", "location2", 5)),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getContent,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))));

// Output
// {client2={location2=9}, client1={location1=3, location2=3}}
// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}
// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}

Note that you can do pretty much anything you want with your the copies of the stream. As per your example, I used a stacked groupingBy collector to group the rows by two properties and then summed up the int property. So the result will be a Map<String, Map<String, Integer>>. But you could also use it for other scenarios:

rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))
like image 25
Malte Hartwig Avatar answered Oct 02 '22 11:10

Malte Hartwig