Today I tried to refactor this code, that reads ids from files in a directory,
Set<Long> ids = new HashSet<>();
for (String fileName : fileSystem.list("my-directory")) {
InputStream stream = fileSystem.openInputStream(fileName);
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
String line;
while ((line = br.readLine()) != null) {
ids.add(Long.valueOf(line.trim()));
}
br.close();
}
using stream api
Set<Long> ids = fileSystem.list("my-directory").stream()
.map(fileName -> fileSystem::openInputStream)
.map(is -> new BufferedReader(new InputStreamReader(is)))
.flatMap(BufferedReader::lines)
.map(String::trim)
.map(Long::valueOf)
.collect(Collectors.toSet());
Then I found that IO streams will not be closed and I don't see a simple way to close them, because they are created inside the pipeline.
Any ideas?
upd: FileSystem in example is HDFS, Files#lines
and similar methods can't be used.
Stream interface extends java. lang. AutoCloseable . This (in theory) puts it on par with files, DB connections, and other resources requiring manual closing.
The try -with-resources statement ensures that each resource is closed at the end of the statement. Any object that implements java. lang. AutoCloseable , which includes all objects which implement java.
Therefore, if we forget to close the stream, the underlying channel will remain open and then we would end up with a resource leak.
The Java try with resources construct, AKA Java try-with-resources, is an exception handling mechanism that can automatically close resources like a Java InputStream or a JDBC Connection when you are done with them. To do so, you must open and use the resource within a Java try-with-resources block.
It is possible to hook into the stream to 'close' resources once all elements of the stream have been consumed. So it is possible to close the reader after all lines have been read with the following modification:
.flatMap(reader -> reader.lines().onClose(() -> close(reader)))
Where close(AutoClosable)
handles the IOException.
As a proof of concept, the following code and output has been tested:
import java.util.stream.Stream;
class Test {
public static void main(String[] args) {
Stream.of(1, 2, 3).flatMap(i ->
Stream.of(i, i * 2).onClose(() ->
System.out.println("Closed!")
)
).forEach(System.out::println);
}
}
1
2
Closed!
2
4
Closed!
3
6
Closed!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With