Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reusable single instance wrapper/object in Java stream map

Seems like this question should already have an answer but I could not find a duplicate.

Anyways I am wondering what community thinks about Stream.map use case like this?

Wrapper wrapper = new Wrapper();
list.stream()
    .map( s -> {
        wrapper.setSource(s);
        return wrapper;
    } )
    .forEach( w -> processWrapper(w) );
    
public static class Source {
    private final String name;
        
    public Source(String name) {
        this.name = name;
    }
        
    public String getName() {
        return name;
    }
}
    
public static class Wrapper {
    private Source source = null;
        
    public void setSource(Source source) {
        this.source = source;
    }
        
    public String getName() {
        return source.getName();
    }
}

public void processWrapper(Wrapper wrapper) {
}

I am not a big fan of this usage of map but it potentially can help with performance when dealing with large streams and avoid creating unnecessary Wrapper for every Source.

This definitely has its limitation like being almost useless with parallel streams and terminal operation like collect.

Update - The question is not about "how to do it" but "can I do it this way". For example, I can have a code that only works with Wrapper and I want to invoke it in forEach but want to avoid creating a new instance of it for each Source element.

Benchmark Results

Shows about 8 fold improvement with reusable wrapper-

Benchmark (N) Mode Cnt Score Error Units

BenchmarkTest.noReuse 10000000 avgt 5 870.253 ± 122.495 ms/op

BenchmarkTest.withReuse 10000000 avgt 5 113.694 ± 2.528 ms/op

Benchmark code -

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class BenchmarkTest {

    @Param({"10000000"})
    private int N;

    private List<Source> data;

    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
            .include(BenchmarkTest.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(opt).run();
    }

    @Setup
    public void setup() {
        data = createData();
    }

    @Benchmark
    public void noReuse(Blackhole bh) {
        data.stream()
            .map( s -> new Wrapper1( s.getName() ) )
            .forEach( t -> processTarget(bh, t) );
    }

    @Benchmark
    public void withReuse(Blackhole bh) {
        Wrapper2 wrapper = new Wrapper2();
        data.stream()
            .map( s -> { wrapper.setSource(s); return wrapper; } )
            .forEach( w -> processTarget(bh, w) );
    }
    
    public void processTarget(Blackhole bh, Wrapper t) {
        bh.consume(t);
    }
    
    private List<Source> createData() {
        List<Source> data = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            data.add( new Source("Number : " + i) );
        }
        return data;
    }
    
    public static class Source {
        private final String name;

        public Source(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public interface Wrapper {
        public String getName();
    }
    
    public static class Wrapper1 implements Wrapper {
        private final String name;

        public Wrapper1(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }
    
    public static class Wrapper2 implements Wrapper {
        private Source source = null;

        public void setSource(Source source) {
            this.source = source;
        }

        public String getName() {
            return source.getName();
        }
    }
}

Full benchmark report -

# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.noReuse
# Parameters: (N = 10000000)

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 1
# Warmup Iteration   1: 1083.656 ms/op
# Warmup Iteration   2: 846.485 ms/op
# Warmup Iteration   3: 901.164 ms/op
# Warmup Iteration   4: 849.659 ms/op
# Warmup Iteration   5: 903.805 ms/op
Iteration   1: 847.008 ms/op
Iteration   2: 895.800 ms/op
Iteration   3: 892.642 ms/op
Iteration   4: 825.901 ms/op
Iteration   5: 889.914 ms/op


Result "BenchmartTest.noReuse":
  870.253 ±(99.9%) 122.495 ms/op [Average]
  (min, avg, max) = (825.901, 870.253, 895.800), stdev = 31.812
  CI (99.9%): [747.758, 992.748] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.withReuse
# Parameters: (N = 10000000)

# Run progress: 50.00% complete, ETA 00:01:58
# Fork: 1 of 1
# Warmup Iteration   1: 113.780 ms/op
# Warmup Iteration   2: 113.643 ms/op
# Warmup Iteration   3: 114.323 ms/op
# Warmup Iteration   4: 114.258 ms/op
# Warmup Iteration   5: 117.351 ms/op
Iteration   1: 114.526 ms/op
Iteration   2: 113.944 ms/op
Iteration   3: 113.943 ms/op
Iteration   4: 112.930 ms/op
Iteration   5: 113.124 ms/op


Result "BenchmarkTest.withReuse":
  113.694 ±(99.9%) 2.528 ms/op [Average]
  (min, avg, max) = (112.930, 113.694, 114.526), stdev = 0.657
  CI (99.9%): [111.165, 116.222] (assumes normal distribution)


# Run complete. Total time: 00:03:40

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                     (N)  Mode  Cnt    Score     Error  Units
BenchmarkTest.noReuse    10000000  avgt    5  870.253 ± 122.495  ms/op
BenchmarkTest.withReuse  10000000  avgt    5  113.694 ±   2.528  ms/op
like image 419
tsolakp Avatar asked Jun 05 '19 14:06

tsolakp


People also ask

Are Java streams reusable?

No. Java streams, once consumed, can not be reused by default. As Java docs say clearly, “A stream should be operated on (invoking an intermediate or terminal stream operation) only once.

Can stream be reused?

From the documentation: A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. So the answer is no, streams are not meant to be reused.

How do I reuse a stream in Java 8?

In Java 8, Stream cannot be reused, once it is consumed or used, the stream will be closed.


1 Answers

Your approach happens to work because the stream pipeline only consists of stateless operation. In such constellations, the sequential stream evaluation may process one element at a time, so accesses to wrapper instances do not overlap, like illustrated here. But note that this is not a guaranteed behavior.

It definitely doesn’t work with stateful operations like sorted and distinct. It also can’t work with reduction operations, as they always have to hold at least two elements for processing, which includes reduce, min, and max. In the case of collect, it depends on the particular Collector. forEachOrdered wouldn’t work with parallel streams, due to the required buffering.

Note that parallel processing would be problematic even when you use TheadLocal to create thread confined wrappers, as there is no guaranty that objects created in one worker thread stay local to that thread. A worker thread may hand over a partial result to another thread before picking up another, unrelated workload.

So this shared mutable wrapper works with a particular set of stateless operations, like map, filter, forEach, findFirst/Any, all/any/noneMatch, in a sequential execution of a particular implementation. You don’t get the flexibility of the API, as you have to limit yourself, can’t pass the stream to arbitrary code expecting a Stream nor use arbitrary Collector implementations. You also don’t have the encapsulation of the interface, as you are assuming particular implementation behavior.

In other words, if you want to use such a mutable wrapper, you are better off with a loop implementing the particular operation. You do already have the disadvantages of such a manual implementation, so why not implementing it to have the advantages.


The other aspect to consider is, what you gain from reusing such a mutable wrapper. It only works in loop-like usages where a temporary object might get optimized away after applying Escape Analysis anyway. In such scenarios, reusing objects, extending their lifetime, may actually degrade performance.

Of course, Object Scalarization is not a guaranteed behavior. There might be scenarios, like a long stream pipeline exceeding the JVM’s inlining limit, where the objects don’t get elided. But still, temporary objects are not necessarily expensive.

This has been explained in this answer. Temporary objects are cheaply allocated. The main costs of a garbage collection are caused by objects which are still alive. These need to be traversed and these need to be moved when making room for new allocations. The negative impact of temporary objects is that they may shorten the time between garbage collection rounds. But this is a function of allocation rate and available allocation space, so this is truly a problem that can be solved by throwing more RAM at it. More RAM means more time between GC cycles and more dead objects when GC happens, which makes the net costs of the GC smaller.

Still, avoiding excessive allocations of temporary objects is a valid concern. The existence of IntStream, LongStream, and DoubleStream shows that. But these are special, as using primitive types is a viable alternative to using the wrapper objects without the disadvantages of reusing a mutable wrapper. It’s also different because it’s applied to problems where the primitive type and the wrapper type are semantically equivalent. In contrast, you want to solve a problem where the operation requires the wrapper type. For the primitive stream also applies, when you need the objects for your problem, there is no way around boxing, which will create distinct objects for distinct values, not sharing a mutable object.

So if you similarly have a problem where a semantically equivalent wrapper-object-avoiding alternative without substantial problems exists, like just using Comparator.comparingInt instead of Comparator.comparing where feasible, you may still prefer it. But only then.


In short, most of the time, the savings of such object reuse, if any, will not justify the disadvantages. In special cases, where it’s beneficial and matters, you may be better off with a loop or any other construct under your full control, instead of using a Stream.

like image 50
Holger Avatar answered Sep 27 '22 16:09

Holger