Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java parallel stream - order of invoking the parallel() method [closed]

AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

When I wrote this I assumed that the threads will be spawned only the map call since parallel is placed after map. But some lines in the file were getting different record numbers for every execution.

I read the official Java stream documentation and a few web sites to understand how streams work under the hood.

A few questions:

  • Java parallel stream works based on SplitIterator, which is implemented by every collection like ArrayList,LinkedList etc. When we construct a parallel stream out of those collections, the corresponding split iterator will be used to split and iterate the collection. This explains why parallelism happened at original input source (File lines) level rather at the result of map (i.e Record pojo). Is my understanding correct?

  • In my case, the input is a file IO stream. Which split iterator will be used?

  • It doesn't matter where we place parallel() in the pipeline. The original input source will always be split and remaining intermediate operations will be applied.

    In this case, Java shouldn't allow users to place parallel operation anywhere in the pipeline except at the original source. Because, it is giving wrong understanding for those who doesn't know how java stream works internally. I know parallel() operation would have been defined for Stream object type and so, it is working this way. But, it is better to provide some alternate solution.

  • In the above code snippet, I am trying to add a line number to every record in the input file and so it should be ordered. However, I want to apply doSomeOperation() in parallel as it is heavy weight logic. The one way to achieve is to write my own customized split iterator. Is there any other way?

like image 415
explorer Avatar asked Apr 12 '20 06:04

explorer


People also ask

Does Java parallel stream maintain order?

If our Stream is ordered, it doesn't matter whether our data is being processed sequentially or in parallel; the implementation will maintain the encounter order of the Stream.

How do you maintain order in parallel stream?

Note: If we want to make each element in the parallel stream to be ordered, we can use the forEachOrdered() method, instead of the forEach() method.

What operations can be executed in parallel with Java parallel streams?

You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.

Does stream process in order?

Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not.


1 Answers

This explains why parallelism happened at original input source (File lines) level rather at the result of map (i.e Record pojo).

The entire stream is either parallel or sequential. We don't select a subset of operations to run sequentially or in parallel.

When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the orientation of the stream on which it is invoked. [...] When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the mode of the stream on which it is invoked. same source

As you mention, parallel streams use split iterators. Clearly, this is to partition the data before operations start running.


In my case, the input is a file IO stream. Which split iterator will be used?

Looking at the source, I see it uses java.nio.file.FileChannelLinesSpliterator


It doesn't matter where we place parallel() in the pipeline. The original input source will always be split and remaining intermediate operations will be applied.

Right. You can even call parallel() and sequential() multiple times. The one invoked last will win. When we call parallel(), we set that for the stream that's returned; and as stated above, all operations run either sequentially or in parallel.


In this case, Java shouldn't allow users to place parallel operation anywhere in the pipeline except at the original source...

This becomes a matter of opinions. I think Zabuza gives a good reason to support the JDK designers' choice.


The one way to achieve is to write my own customized split iterator. Is there any other way?

This depends on your operations

  • If findFirst() is your real terminal operation, then you don't even need to worry about parallel execution, because there won't be many calls to doSomething() anyway (findFirst() is short-circuiting). .parallel() in fact may cause more than one element to be processed, while findFirst() on a sequential stream would prevent that.
  • If your terminal operation doesn't create much data, then maybe you can create your Record objects using a sequential stream, then process the result in parallel:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
    
  • If your pipeline would load a lot of data in memory (which may be the reason you're using Files.lines()), then perhaps you'll need a custom split iterator. Before I go there, though, I'd look into other options (such saving lines with an id column to start with - that's just my opinion).
    I'd also attempt to process records in smaller batches, like this:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }
    

    This executes doSomeOperation() in parallel without loading all the data into memory. But note that batchSize will need to be given a thought.

like image 68
ernest_k Avatar answered Oct 20 '22 02:10

ernest_k