Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Folding a sequential stream in Java

I'm used to programming in Scala, but I have to write some Java, and I'm trying to perform the equivalent of the following Scala snippet:

trait Options[K, V] {
  def add(key: K , value: V): Options[K, V]
}

val options: Options[T, U] = ???
val elems: List[(T, U)] = ???
elems.foldLeft(options) {
  case (opts, (key, value)) => opts.add(key, value)
}

That is, I'm folding the elements on elems inside options, producing a new instance at each step.

I tried to use Java's Stream#reduce:

interface Options<K, V> {
  Options<K, V> add(K key, V value);
}

Options<K, V> options = ???
Stream<Tuple2<K, V>> elems = ??? // This is Reactor's Tuple2
elems.reduce(options, (opts, opt) -> opts.add(opt), ???)

I don't know what the combiner should be, and I'm having trouble imagining what values its arguments will have. My understanding is that the combiner will be used to combine intermediate values produced in parallel in a parallel stream. I do not care at all about processing elems in parallel in my case. In other terms, I'm looking for a synchronous and sequential version of Flux#reduce.

I have no control on the API of Options. elems does not need to be a Stream.

like image 276
Martin Avatar asked Jan 28 '19 15:01

Martin


People also ask

How do you create a sequential stream in Java?

Using Iterable. Spliterator(): Spliterator is the key to create the sequential stream. Hence in this method also, Spliterator is used. But in this method, the source of Spliterator is set to an Iterable created from the Iterator.

What is the difference between stream () and parallelStream ()?

stream() works in sequence on a single thread with the println() operation. list. parallelStream(), on the other hand, is processed in parallel, taking full advantage of the underlying multicore environment. The interesting aspect is in the output of the preceding program.

What is folding in Java?

In functional programming, fold (also termed reduce, accumulate, aggregate, compress, or inject) refers to a family of higher-order functions that analyze a recursive data structure and through use of a given combining operation, recombine the results of recursively processing its constituent parts, building up a ...

Does Java parallel stream maintain order?

If our Stream is ordered, it doesn't matter whether our data is being processed sequentially or in parallel; the implementation will maintain the encounter order of the Stream.


1 Answers

It's not possible to write a combiner with the interface you've provided. The problem is that a combiner needs a way to combine two Options but there is no way to do that. The only thing anyone can do with an Options instance is add one pair to it. I can't get any information out of it. It presumably can't do anything very useful.

Perhaps this issue stems from the fact that Java does not have traits and neither are Java interfaces a suitable substitute for traits.

The idiomatic Java way to write this is just a bog-standard for-loop:

Options<String, String> options = /*whatever*/;
List<Pair<String, String>> elems = /*whatever*/;
for (Pair<String, String> pair : elems)
{
    options = options.add(pair.getKey(), pair.getValue());
}

If you can deal with the fact that you'll never been able to use a parallel stream, you can take advantage of the fact that a sequential stream will never actually use a combiner. As such, you can write a Collector which defines a combiner that will just throw an exception.

Options<String, String> foo = elems.stream()
    .collect(
        () -> options,
        (opt, pair) -> opt.add(pair.getKey(), pair.getValue()),
        (a, b) -> { throw new UnsupportedOperationException(); }
    );

If you really want to use reduce, you'll need to modify your interface to either expose some information about the key-value pairs it contains or to provide a means to add more than one key-value pair at once. For example:

interface Options<K, V>
{
    Options<K, V> add(K key, V value);
    Options<K, V> add(Options<K, V> otherOptions);
}

Options<String, String> options = /*whatever*/;
List<Pair<String, String>> elems = /*whatever*/;

Options<String, String> foo = elems.stream()
    .reduce(
        options,
        (opt, pair) -> opt.add(pair.getKey(), pair.getValue()),
        Options::add
    );

I doubt that's what you wanted to hear but Scala and Java are different languages. You shouldn't expect everything to have an exact parallel. If it did, there would be no reason for both languages to exist in the first place.

like image 102
Michael Avatar answered Sep 21 '22 02:09

Michael