Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How should I sum something with streams?

Tags:

java

java-8

I've seen and tried different implementations of how to sum something in a stream. Here is my code:

List<Person> persons = new ArrayList<Person>();

for(int i=0; i < 10000000; i++){
    persons.add(new Person("random", 26));
}

Long start = System.currentTimeMillis();
int test = persons.stream().collect(Collectors.summingInt(p -> p.getAge()));
Long end = System.currentTimeMillis();
System.out.println("Sum of ages = " + test + " and it took : " + (end - start) + " ms with collectors");

Long start3 = System.currentTimeMillis();
int test3 = persons.parallelStream().collect(Collectors.summingInt(p -> p.getAge()));
Long end3 = System.currentTimeMillis();
System.out.println("Sum of ages = " + test3 + " and it took : " + (end3 - start3) + " ms with collectors and parallel stream");


Long start2 = System.currentTimeMillis();
int test2 = persons.stream().mapToInt(p -> p.getAge()).sum();
Long end2 = System.currentTimeMillis();
System.out.println("Sum of ages = " + test2 + " and it took : " + (end2 - start2) + " ms with map and sum");

Long start4 = System.currentTimeMillis();
int test4 = persons.parallelStream().mapToInt(p -> p.getAge()).sum();
Long end4 = System.currentTimeMillis();
System.out.println("Sum of ages = " + test4 + " and it took : " + (end4 - start4) + " ms with map and sum and parallel stream");

which gave me the following result :

Sum of ages = 220000000 and it took : 110 ms with collectors
Sum of ages = 220000000 and it took : 272 ms with collectors and parallel stream
Sum of ages = 220000000 and it took : 137 ms with map and sum
Sum of ages = 220000000 and it took : 134 ms with map and sum and parallel stream

I tried it several times and gave me different results each time (most of the time the last solution is the best), so I was wondering:

1) What is the correct way to do it?

2) Why? (What is the difference to other solutions?)

like image 682
Louis F. Avatar asked Jun 13 '14 05:06

Louis F.


1 Answers

Before we get into the actual answer, a few things you should know:

  1. The results of your test can vary quite strongly, depending on many factors (e.g. the computer you're running it on). Here are the results of one run on my 8 core machine:

    Sum of ages = 260000000 and it took : 94 ms with collectors
    Sum of ages = 260000000 and it took : 61 ms with collectors and parallel stream
    Sum of ages = 260000000 and it took : 70 ms with map and sum
    Sum of ages = 260000000 and it took : 94 ms with map and sum and parallel stream
    

    And then in a later run:

    Sum of ages = 260000000 and it took : 68 ms with collectors
    Sum of ages = 260000000 and it took : 67 ms with collectors and parallel stream
    Sum of ages = 260000000 and it took : 66 ms with map and sum
    Sum of ages = 260000000 and it took : 109 ms with map and sum and parallel stream
    
  2. Micro benchmarking isn't an easy topic. There are methods to do it (and I'll get into some later) but just trying to use System.currentTimeMillies() won't work reliably in most cases.

  3. Just because Java 8 makes parallel operations easy, that doesn't mean that they should be used everywhere. Parallel operations make sense in some cases and don't in others.

OK, now let's have a look at the various methods you're using.

  • Sequential collectors: The summingInt collector you use has the following implementation:

    public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<>(
                () -> new int[1],
                (a, t) -> { a[0] += mapper.applyAsInt(t); },
                (a, b) -> { a[0] += b[0]; return a; },
                a -> a[0], Collections.emptySet());
    }
    

    So, first of all a new array with one element will be created. Then for every Person element in your stream the collect function will use the Person#getAge() function to retrieve the age as an Integer (not an int!) and add that age to the previous ones (in the 1D-array). Finally, when the whole stream has been dealt with, it will extract the value from that array and return it. So, there is a lot of auto-boxing and -unboxing going on here.

  • Parallel collectors: This uses the ReferencePipeline#forEach(Consumer) function to accumulate the ages it gets from the mapping function. Again there is a lot of auto-boxing and -unboxing.
  • Sequential map and sum: Here you map your Stream<Person> to an IntStream. One thing this means is that no auto-boxing or -unboxing is required any more; this can in some cases save a lot of time. Then it sums the resulting stream using the following implementation:

    @Override
    public final int sum() {
        return reduce(0, Integer::sum);
    }
    

    The reduce function here will call ReduceOps#ReduceOp#evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator). This will, in essence, use the Integer::sum function on all of your numbers, starting with 0 and the first number and then the result of that with the second number and so forth.

  • Parallel map and sum: Here things get interesting. It uses the same sum() function, however the reduce will in this case call ReduceOps#ReduceOp#evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) rather than the sequential option. This will basically use a divide and conquer method to add up the values. Now, the big advantage of divide and conquer is of course, that it can easily be done in parallel. However it does require splitting and re-joining the stream many times, which costs time. So how fast it is can vary quite heavily, depending on the complexity of the actual task it has to do with the elements. In the case of adding, it's probably not worth it in most cases; as you can see from my results it was always one of the slower methods.

Now, to get a real idea of how long what takes, let's do a proper micro benchmark. I'll be using JMH with the following benchmark code:

package com.stackoverflow.user2352924;

import org.openjdk.jmh.annotations.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MINUTES)
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 10, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Threads(2)
public class MicroBenchmark {

    private static List<Person> persons = new ArrayList<>();

    private int test;

    static {
        for(int i=0; i < 10000000; i++){
            persons.add(new Person("random", 26));
        }
    }

    @Benchmark
    public void sequentialCollectors() {
        test = 0;
        test += persons.stream().collect(Collectors.summingInt(p -> p.getAge()));
    }

    @Benchmark
    public void parallelCollectors() {
        test = 0;
        test += persons.parallelStream().collect(Collectors.summingInt(p -> p.getAge()));
    }

    @Benchmark
    public void sequentialMapSum() {
        test = 0;
        test += persons.stream().mapToInt(p -> p.getAge()).sum();
    }

    @Benchmark
    public void parallelMapSum() {
        test = 0;
        test += persons.parallelStream().mapToInt(p -> p.getAge()).sum();
    }

}

The pom.xml for this maven project looks like this:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.stackoverflow.user2352924</groupId>
    <artifactId>StackOverflow</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>Auto-generated JMH benchmark</name>

    <prerequisites>
        <maven>3.0</maven>
    </prerequisites>

    <dependencies>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>${jmh.version}</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>${jmh.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jmh.version>0.9.5</jmh.version>
        <javac.target>1.8</javac.target>
        <uberjar.name>benchmarks</uberjar.name>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <compilerVersion>${javac.target}</compilerVersion>
                    <source>${javac.target}</source>
                    <target>${javac.target}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.2</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <finalName>microbenchmarks</finalName>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.openjdk.jmh.Main</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>2.5</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.4</version>
                </plugin>
                <plugin>
                    <artifactId>maven-javadoc-plugin</artifactId>
                    <version>2.9.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>2.6</version>
                </plugin>
                <plugin>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.3</version>
                </plugin>
                <plugin>
                    <artifactId>maven-source-plugin</artifactId>
                    <version>2.2.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.17</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

</project>

Make sure that Maven is running with Java 8 too, otherwise you'll get ugly errors.

I won't go into details about how to use JMH here (there are other places that do that) but here's the result I got:

# Run complete. Total time: 00:08:48

Benchmark                                     Mode  Samples     Score  Score error    Units
c.s.u.MicroBenchmark.parallelCollectors      thrpt       10  3658,949      775,115  ops/min
c.s.u.MicroBenchmark.parallelMapSum          thrpt       10  2616,905      221,109  ops/min
c.s.u.MicroBenchmark.sequentialCollectors    thrpt       10  5502,160      439,024  ops/min
c.s.u.MicroBenchmark.sequentialMapSum        thrpt       10  6120,162      609,232  ops/min

So, on my system at the time I ran those tests, the sequential map sum was considerably faster, managing to do over 6100 operations in the time that the parallel map sum (using a divide and conquer method) managed to do only just over 2600. In fact, the sequential methods were both considerably faster than the parallel ones.

Now, in a situation which can more easily be run in parallel - e.g. where the Person#getAge() function was much more complex than just a getter - the parallel methods may well be a much better solution. In the end it all depends on the efficiency of parallel runs in the case being tested.

Another thing to remember: if in doubt, do a proper micro benchmark. ;-)

like image 86
blalasaadri Avatar answered Nov 02 '22 17:11

blalasaadri