Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Performance comparison between Rxjava2, Java 8 Streams, Plain Old Iteration

Tags:

I have become a big fan of functional programming in java in Java 8 and also Rx java. but a colleague recently pointed out that there is a performance hit using these. So decided to run JMH Bench marking but it seems he was right. No matter what i do, i can't get the streams version to give better performance. Below is my code

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream().parallel()
                .mapToInt(Integer::intValue)
                .filter(i -> i % 2 == 0)
                .mapToDouble(i->(double)i)
                .map(Math::sqrt)
                .boxed()
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Flowable.fromIterable(sourceList)
                       .parallel()
                       .runOn(Schedulers.computation())
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .sequential()
                       .blockingFirst();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

Results for above code:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt     Score     Error  Units
StreamVsVanilla.rxjava2  avgt   20  1179.733 ± 322.421  ns/op
StreamVsVanilla.stream   avgt   20    10.556 ±   1.195  ns/op
StreamVsVanilla.vanilla  avgt   20     8.220 ±   0.705  ns/op

Even if i remove parellal operators and use sequential versions as below:

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream()
                .mapToInt(Integer::intValue)
                .filter(i -> i % 2 == 0)
                .mapToDouble(i->(double)i)
                .map(Math::sqrt)
                .boxed()
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Observable.fromIterable(sourceList)
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .blockingGet();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

The Results are not very favourable:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt   Score   Error  Units
StreamVsVanilla.rxjava2  avgt   20  12.226 ± 0.603  ns/op
StreamVsVanilla.stream   avgt   20  13.432 ± 0.858  ns/op
StreamVsVanilla.vanilla  avgt   20   7.678 ± 0.350  ns/op

Can somebody help me figure out what m i doing wrong?

Edit:

akarnokd pointed i am using extra stage to unbox and box in my stream version during sequential verstion(i added it to avoid implicit boxing unboxing in filter and map methods), however it got slower so i tried without those with code below

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream()
                .filter(i -> i % 2 == 0)
                .map(Math::sqrt)
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Observable.fromIterable(sourceList)
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .blockingGet();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

Results are still more or less same:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt   Score   Error  Units
StreamVsVanilla.rxjava2  avgt   20  10.864 ± 0.555  ns/op
StreamVsVanilla.stream   avgt   20  10.466 ± 0.050  ns/op
StreamVsVanilla.vanilla  avgt   20   7.513 ± 0.136  ns/op
like image 472
Shariq Avatar asked Dec 30 '17 09:12

Shariq


1 Answers

For the parallel version

It is relatively expensive to fire up and dispatch values to multiple threads. To offset this, the parallel computation is usually several times more costly than the infrastructure overhead. However, with your case in RxJava, Math::sqrt is so trivial the parallel overhead dominates the performance.

Then why is Stream two orders of magnitude faster? I can only assume that thread stealing comes in where the benchmark thread does most of the actual work and maybe one background thread does some small amount of the rest because by the time the background thread spins up, the main thread has stolen most of the tasks back. Therefore, there you don't have strict parallel execution like with RxJava's parallel where the operator dispatches work in a round-robin fashion so that all parallel rails could become busy roughly equally.

For the sequential version

I think the fact that you have extra unboxing and boxing stages in your Stream version adds a little bit of overhead. Try without it:

   return  sourceList.stream()
            .filter(i -> i % 2 == 0)
            .map(Math::sqrt)
            .collect(Collectors.toList());
like image 167
akarnokd Avatar answered Sep 20 '22 13:09

akarnokd