Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can you split a stream into two streams?

People also ask

Can a stream be used multiple times?

From the documentation: A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. So the answer is no, streams are not meant to be reused.

How do you add two streams?

concat() in Java. Stream. concat() method creates a concatenated stream in which the elements are all the elements of the first stream followed by all the elements of the second stream. The resulting stream is ordered if both of the input streams are ordered, and parallel if either of the input streams is parallel.

What are Streams in Java?

A stream is a sequence of objects that supports various methods which can be pipelined to produce the desired result. The features of Java stream are – A stream is not a data structure instead it takes input from the Collections, Arrays or I/O channels.


A collector can be used for this.

  • For two categories, use Collectors.partitioningBy() factory.

This will create a Map<Boolean, List>, and put items in one or the other list based on a Predicate.

Note: Since the stream needs to be consumed whole, this can't work on infinite streams. And because the stream is consumed anyway, this method simply puts them in Lists instead of making a new stream-with-memory. You can always stream those lists if you require streams as output.

Also, no need for the iterator, not even in the heads-only example you provided.

  • Binary splitting looks like this:
Random r = new Random();

Map<Boolean, List<String>> groups = stream
    .collect(Collectors.partitioningBy(x -> r.nextBoolean()));

System.out.println(groups.get(false).size());
System.out.println(groups.get(true).size());
  • For more categories, use a Collectors.groupingBy() factory.
Map<Object, List<String>> groups = stream
    .collect(Collectors.groupingBy(x -> r.nextInt(3)));
System.out.println(groups.get(0).size());
System.out.println(groups.get(1).size());
System.out.println(groups.get(2).size());

In case the streams are not Stream, but one of the primitive streams like IntStream, then this .collect(Collectors) method is not available. You'll have to do it the manual way without a collector factory. It's implementation looks like this:

[Example 2.0 since 2020-04-16]

    IntStream    intStream = IntStream.iterate(0, i -> i + 1).limit(100000).parallel();
    IntPredicate predicate = ignored -> r.nextBoolean();

    Map<Boolean, List<Integer>> groups = intStream.collect(
            () -> Map.of(false, new ArrayList<>(100000),
                         true , new ArrayList<>(100000)),
            (map, value) -> map.get(predicate.test(value)).add(value),
            (map1, map2) -> {
                map1.get(false).addAll(map2.get(false));
                map1.get(true ).addAll(map2.get(true ));
            });

In this example I initialize the ArrayLists with the full size of the initial collection (if this is known at all). This prevents resize events even in the worst-case scenario, but can potentially gobble up 2NT space (N = initial number of elements, T = number of threads). To trade-off space for speed, you can leave it out or use your best educated guess, like the expected highest number of elements in one partition (typically just over N/2 for a balanced split).

I hope I don't offend anyone by using a Java 9 method. For the Java 8 version, look at the edit history.


I stumbled across this question to my self and I feel that a forked stream has some use cases that could prove valid. I wrote the code below as a consumer so that it does not do anything but you could apply it to functions and anything else you might come across.

class PredicateSplitterConsumer<T> implements Consumer<T>
{
  private Predicate<T> predicate;
  private Consumer<T>  positiveConsumer;
  private Consumer<T>  negativeConsumer;

  public PredicateSplitterConsumer(Predicate<T> predicate, Consumer<T> positive, Consumer<T> negative)
  {
    this.predicate = predicate;
    this.positiveConsumer = positive;
    this.negativeConsumer = negative;
  }

  @Override
  public void accept(T t)
  {
    if (predicate.test(t))
    {
      positiveConsumer.accept(t);
    }
    else
    {
      negativeConsumer.accept(t);
    }
  }
}

Now your code implementation could be something like this:

personsArray.forEach(
        new PredicateSplitterConsumer<>(
            person -> person.getDateOfBirth().isPresent(),
            person -> System.out.println(person.getName()),
            person -> System.out.println(person.getName() + " does not have Date of birth")));

Unfortunately, what you ask for is directly frowned upon in the JavaDoc of Stream:

A stream should be operated on (invoking an intermediate or terminal stream operation) only once. This rules out, for example, "forked" streams, where the same source feeds two or more pipelines, or multiple traversals of the same stream.

You can work around this using peek or other methods should you truly desire that type of behaviour. In this case, what you should do is instead of trying to back two streams from the same original Stream source with a forking filter, you would duplicate your stream and filter each of the duplicates appropriately.

However, you may wish to reconsider if a Stream is the appropriate structure for your use case.


Not exactly. You can't get two Streams out of one; this doesn't make sense -- how would you iterate over one without needing to generate the other at the same time? A stream can only be operated over once.

However, if you want to dump them into a list or something, you could do

stream.forEach((x) -> ((x == 0) ? heads : tails).add(x));

This is against the general mechanism of Stream. Say you can split Stream S0 to Sa and Sb like you wanted. Performing any terminal operation, say count(), on Sa will necessarily "consume" all elements in S0. Therefore Sb lost its data source.

Previously, Stream had a tee() method, I think, which duplicate a stream to two. It's removed now.

Stream has a peek() method though, you might be able to use it to achieve your requirements.