When I execute this code which opens a lot of files during a stream pipeline:
public static void main(String[] args) throws IOException { Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"), 100, (path, attr) -> path.toString().endsWith(".html")) .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1))) .map(Stream::count) .forEachOrdered(System.out::println); }
I get an exception:
java.nio.file.FileSystemException: /long/file/name: Too many open files
The problem is that Stream.count
does not close the stream when it is done traversing it. But I don't see why it shouldn't, given that it is a terminal operation. The same holds for other terminal operations such as reduce
and forEach
. flatMap
on the other hand closes the streams it consists of.
The documentation tells me to use a try-with-resouces-statement to close streams if necessary. In my case I could replace the count
line with something like this:
.map(s -> { long c = s.count(); s.close(); return c; } )
But that is noisy and ugly and could be a real inconvenience in some cases with big, complex pipelines.
So my questions are the following:
runtimizeException
is a method that wraps checked exception in RuntimeException
s.
If the stream would instead be closed implicitly in the final operation, the close method would be called once, if an IOException occurs, and twice if the operation completes successfully. This might be a plausible explanation.
If you don't close streams, you may have problems opening them back up again. This is especially true if they're hanging off the end of sockets. Closing a stream also makes sure that data is flushed through the stream if there is any data left to send.
filter , sorted , and map , which can be connected together to form a pipeline. collect , which closed the pipeline and returned a result.
Streams have a BaseStream. close() method and implement AutoCloseable , but nearly all stream instances do not actually need to be closed after use. Generally, only streams whose source is an IO channel (such as those returned by Files. lines(Path, Charset) ) will require closing.
There are two issues here: handling of checked exceptions such as IOException
, and timely closing of resources.
None of the predefined functional interfaces declare any checked exceptions, which means that they have to be handled within the lambda, or wrapped in an unchecked exception and rethrown. It looks like your runtimizeException
function does that. You probably also had to declare your own functional interface for it. As you've probably discovered, this is a pain.
On the closing of resources like files, there was some investigation of having streams be closed automatically when the end of the stream was reached. This would be convenient, but it doesn't deal with closing when an exception is thrown. There's no magic do-the-right-thing mechanism for this in streams.
We're left with the standard Java techniques of dealing with resource closure, namely the try-with-resources construct introduced in Java 7. TWR really wants to have resources be closed at the same level in the call stack as they were opened. The principle of "whoever opens it has to close it" applies. TWR also deals with exception handling, which usually makes it convenient to deal with exception handling and resource closing in the same place.
In this example, the stream is somewhat unusual in that it maps a Stream<Path>
to a Stream<Stream<String>>
. These nested streams are the ones that aren't closed, resulting in the eventual exception when the system runs out of open file descriptors. What makes this difficult is that files are opened by one stream operation and then passed downstream; this makes it impossible to use TWR.
An alternative approach to structuring this pipeline is as follows.
The Files.lines
call is the one that opens the file, so this has to be the resource in the TWR statement. The processing of this file is where (some) IOExceptions
get thrown, so we can do the exception wrapping in the same TWR statement. This suggests having a simple function that maps the path to a line count, while handling resource closing and exception wrapping:
long lineCount(Path path) { try (Stream<String> s = Files.lines(path, StandardCharsets.ISO_8859_1)) { return s.count(); } catch (IOException ioe) { throw new UncheckedIOException(ioe); } }
Once you have this helper function, the main pipeline looks like this:
Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"), 100, (path, attr) -> path.toString().endsWith(".html")) .mapToLong(this::lineCount) .forEachOrdered(System.out::println);
It is possible to create a utility method that reliably closes streams in the middle of a pipeline.
This makes sure that each resource is closed with a try-with-resource-statement but avoids the need for a custom utility method, and is much less verbose than writing the try-statement directly in the lambda.
With this method the pipeline from the question looks like this:
Files.find(Paths.get("Java_8_API_docs/docs/api"), 100, (path, attr) -> path.toString().endsWith(".html")) .map(file -> applyAndClose( () -> Files.lines(file, StandardCharsets.ISO_8859_1), Stream::count)) .forEachOrdered(System.out::println);
The implementation looks like this:
/** * Applies a function to a resource and closes it afterwards. * @param sup Supplier of the resource that should be closed * @param op operation that should be performed on the resource before it is closed * @return The result of calling op.apply on the resource */ private static <A extends AutoCloseable, B> B applyAndClose(Callable<A> sup, Function<A, B> op) { try (A res = sup.call()) { return op.apply(res); } catch (RuntimeException exc) { throw exc; } catch (Exception exc) { throw new RuntimeException("Wrapped in applyAndClose", exc); } }
(Since resources that need to be closed often also throw exceptions when they are allocated non-runtime exceptions are wrapped in runtime exceptions, avoiding the need for a separate method that does that.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With