Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the most efficient way to make a copy of a stream?

I have a method which performs processing on a stream. Part of that processing needs to be done under the control of a lock - one locked section for processing all the elements - but some of it doesn't (and shouldn't be because it might be quite time-consuming). So I can't just say:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess);
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing);
}
toPostProcess.map(this::postProcess).forEach(System.out::println);

because the calls to doLockedProcessing would only be executed when the terminal operation forEach is invoked, and that is outside the lock.

So I think I need to make a copy of the stream, using a terminal operation, at each stage so that the right bits are done at the right time. Something like:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).copy();
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).copy();
}
toPostProcess.map(this::postProcess).forEach(System.out::println);

Of course, the copy() method doesn't exist, but if it did it would perform a terminal operation on the stream and return a new stream containing all the same elements.

I'm aware of a few ways of achieving this:

(1) Via an array (not so easy if the element type is a generic type):

copy = Stream.of(stream.toArray(String[]::new));

(2) Via a list:

copy = stream.collect(Collectors.toList()).stream();

(3) Via a stream builder:

Stream.Builder<V> builder = Stream.builder();
stream.forEach(builder);
copy = builder.build();

What I want to know is: which of these methods is the most efficient in terms of time and memory? Or is there another way which is better?

like image 740
Jeremy Hicks Avatar asked Jan 22 '19 14:01

Jeremy Hicks


2 Answers

I think you have already mentioned all possible options. There's no other structural way to do what you need. First, you'd have to consume the original stream. Then, create a new stream, acquire the lock and consume this new stream (thus invoking your locked operation). Finally, create a yet newer stream, release the lock and go on processing this newer stream.

From all the options you are considering, I would use the third one, because the number of elements it can handle is only limited by memory, meaning it doesn't have an implicit max size restriction, like i.e. ArrayList has (it can contain about Integer.MAX_VALUE elements).

Needless to say, this would be a quite expensive operation, both regarding time and space. You could do it was follows:

Stream<V> temp = Stream.of(objects)
        .map(this::preProcess)
        .collect(Stream::<V>builder,
                 Stream.Builder::accept,
                 (b1, b2) -> b2.build().forEach(b1))
        .build();

synchronized (lockObj) {
    temp = temp
            .map(this::doLockedProcessing)
            .collect(Stream::<V>builder,
                     Stream.Builder::accept,
                     (b1, b2) -> b2.build().forEach(b1))
            .build();
}

temp.map(this::postProcess).forEach(System.out::println);

Note that I've used a single Stream instance temp, so that intermediate streams (and their builders) can be garbage-collected, if needed.


As suggested by @Eugene in the comments, it would be nice to have a utility method to avoid code duplication. Here's such method:

public static <T> Stream<T> copy(Stream<T> source) {
    return source.collect(Stream::<T>builder,
                          Stream.Builder::accept,
                          (b1, b2) -> b2.build().forEach(b1))
                 .build();
}

Then, you could this method as follows:

Stream<V> temp = copy(Stream.of(objects).map(this::preProcess));

synchronized (lockObj) {
    temp = copy(temp.map(this::doLockedProcessing));
}

temp.map(this::postProcess).forEach(System.out::println);
like image 85
fps Avatar answered Oct 05 '22 11:10

fps


I created a benchmark test which compares the three methods. This suggested that using a List as the intermediate store is about 30% slower than using an array or a Stream.Builder, which are similar. I am therefore drawn to using a Stream.Builder because converting to an array is tricky where the element type is a generic type.

I've ended up writing a little function that creates a Collector which uses a Stream.Builder as the intermediate store:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

I can then make a copy of any stream str by doing str.collect(copyCollector()) which feels quite in keeping with the idiomatic usage of streams.

The original code I posted would then look like this:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).collect(copyCollector());
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).collect(copyCollector());
}
toPostProcess.map(this::postProcess).forEach(System.out::println);
like image 30
Jeremy Hicks Avatar answered Oct 05 '22 12:10

Jeremy Hicks