Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Closing streams in the middle of pipelines

Tags:

When I execute this code which opens a lot of files during a stream pipeline:

public static void main(String[] args) throws IOException {     Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),             100, (path, attr) -> path.toString().endsWith(".html"))         .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1)))         .map(Stream::count)         .forEachOrdered(System.out::println); } 

I get an exception:

java.nio.file.FileSystemException: /long/file/name: Too many open files 

The problem is that Stream.count does not close the stream when it is done traversing it. But I don't see why it shouldn't, given that it is a terminal operation. The same holds for other terminal operations such as reduce and forEach. flatMap on the other hand closes the streams it consists of.

The documentation tells me to use a try-with-resouces-statement to close streams if necessary. In my case I could replace the count line with something like this:

.map(s -> { long c = s.count(); s.close(); return c; } ) 

But that is noisy and ugly and could be a real inconvenience in some cases with big, complex pipelines.

So my questions are the following:

  1. Why were the streams not designed so that terminal operations close the streams they are working on? That would make them work better with IO streams.
  2. What is the best solution for closing IO streams in pipelines?

runtimizeException is a method that wraps checked exception in RuntimeExceptions.

like image 473
Lii Avatar asked Apr 07 '14 19:04

Lii


People also ask

Which method will close the stream pipeline?

If the stream would instead be closed implicitly in the final operation, the close method would be called once, if an IOException occurs, and twice if the operation completes successfully. This might be a plausible explanation.

Why should you always close streams?

If you don't close streams, you may have problems opening them back up again. This is especially true if they're hanging off the end of sockets. Closing a stream also makes sure that data is flushed through the stream if there is any data left to send.

Which method will close the stream pipeline and return a result?

filter , sorted , and map , which can be connected together to form a pipeline. collect , which closed the pipeline and returned a result.

Why streams are closed?

Streams have a BaseStream. close() method and implement AutoCloseable , but nearly all stream instances do not actually need to be closed after use. Generally, only streams whose source is an IO channel (such as those returned by Files. lines(Path, Charset) ) will require closing.


2 Answers

There are two issues here: handling of checked exceptions such as IOException, and timely closing of resources.

None of the predefined functional interfaces declare any checked exceptions, which means that they have to be handled within the lambda, or wrapped in an unchecked exception and rethrown. It looks like your runtimizeException function does that. You probably also had to declare your own functional interface for it. As you've probably discovered, this is a pain.

On the closing of resources like files, there was some investigation of having streams be closed automatically when the end of the stream was reached. This would be convenient, but it doesn't deal with closing when an exception is thrown. There's no magic do-the-right-thing mechanism for this in streams.

We're left with the standard Java techniques of dealing with resource closure, namely the try-with-resources construct introduced in Java 7. TWR really wants to have resources be closed at the same level in the call stack as they were opened. The principle of "whoever opens it has to close it" applies. TWR also deals with exception handling, which usually makes it convenient to deal with exception handling and resource closing in the same place.

In this example, the stream is somewhat unusual in that it maps a Stream<Path> to a Stream<Stream<String>>. These nested streams are the ones that aren't closed, resulting in the eventual exception when the system runs out of open file descriptors. What makes this difficult is that files are opened by one stream operation and then passed downstream; this makes it impossible to use TWR.

An alternative approach to structuring this pipeline is as follows.

The Files.lines call is the one that opens the file, so this has to be the resource in the TWR statement. The processing of this file is where (some) IOExceptions get thrown, so we can do the exception wrapping in the same TWR statement. This suggests having a simple function that maps the path to a line count, while handling resource closing and exception wrapping:

long lineCount(Path path) {     try (Stream<String> s = Files.lines(path, StandardCharsets.ISO_8859_1)) {         return s.count();     } catch (IOException ioe) {         throw new UncheckedIOException(ioe);     } } 

Once you have this helper function, the main pipeline looks like this:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),            100, (path, attr) -> path.toString().endsWith(".html"))      .mapToLong(this::lineCount)      .forEachOrdered(System.out::println); 
like image 146
Stuart Marks Avatar answered Oct 07 '22 08:10

Stuart Marks


It is possible to create a utility method that reliably closes streams in the middle of a pipeline.

This makes sure that each resource is closed with a try-with-resource-statement but avoids the need for a custom utility method, and is much less verbose than writing the try-statement directly in the lambda.

With this method the pipeline from the question looks like this:

Files.find(Paths.get("Java_8_API_docs/docs/api"), 100,         (path, attr) -> path.toString().endsWith(".html"))     .map(file -> applyAndClose(         () -> Files.lines(file, StandardCharsets.ISO_8859_1),         Stream::count))     .forEachOrdered(System.out::println); 

The implementation looks like this:

/**  * Applies a function to a resource and closes it afterwards.  * @param sup Supplier of the resource that should be closed  * @param op operation that should be performed on the resource before it is closed  * @return The result of calling op.apply on the resource   */ private static <A extends AutoCloseable, B> B applyAndClose(Callable<A> sup, Function<A, B> op) {     try (A res = sup.call()) {         return op.apply(res);     } catch (RuntimeException exc) {         throw exc;     } catch (Exception exc) {         throw new RuntimeException("Wrapped in applyAndClose", exc);     } } 

(Since resources that need to be closed often also throw exceptions when they are allocated non-runtime exceptions are wrapped in runtime exceptions, avoiding the need for a separate method that does that.)

like image 41
Lii Avatar answered Oct 07 '22 07:10

Lii