I want to stream the lines contained in files but MOVING each file to another folder once it has been processed.
The current process is like this:
Explanation:
Stream
of File
sBufferedReader
for each one of themflatMap
to the lines Stream
of the BufferedReader
Code (omitted exceptions for simplicity):
(1) Stream.generate(localFileProvider::getNextFile)
(2) .map(file -> new BufferedReader(new InputStreamReader(new FileInputStream(file))))
(3) .flatMap(BufferedReader::lines)
(4) .map(System.out::println)
.MOVE_EACH_FILE_FROM_INPUT_FOLDER_TO_SOME_OTHER_FOLDER;
Would it be possible to move each file once it has been completely read and continue processing the other files in the stream?
No storage. Streams don't have storage for values; they carry values from a source (which could be a data structure, a generating function, an I/O channel, etc) through a pipeline of computational steps.
The file descriptor interface provides only simple functions for transferring blocks of characters, but the stream interface also provides powerful formatted input and output functions ( printf and scanf ) as well as functions for character- and line-oriented input and output.
A file stream is a sequence of bytes used to hold file data. Usually a file has only one file stream, namely the file's default data stream. However, on file systems that support multiple data streams, each file can have multiple file streams. One of these is the default data stream, which is unnamed.
You can chain a close action to a stream, which will be executed automatically in case of flatMap
:
Stream.generate(localFileProvider::getNextFile).takeWhile(Objects::nonNull)
.flatMap(file -> {
try {
Path p = file.toPath();
return Files.lines(p, Charset.defaultCharset()).onClose(() -> {
try { // move path/x/y/z to path/x/y/z.moved
Files.move(p, p.resolveSibling(p.getFileName()+".moved"));
} catch(IOException ex) { throw new UncheckedIOException(ex); }
});
} catch(IOException ex) { throw new UncheckedIOException(ex); }
})
.forEach(System.out::println);
It’s important that the documentation of onClose
states:
Close handlers are run when the
close()
method is called on the stream, and are executed in the order they were added.
So the moving close handler is executed after the already existing close handler that will close the file handle used for reading the lines.
I used Charset.defaultCharset()
to mimic the behavior of the nested constructors new InputStreamReader(new FileInputStream(file)))
of your question’s code, but generally, you should use a fixed charset, like the Files.lines
’s default UTF-8 whenever possible.
I would just create two methods:
public void processFile(File f);
public void moveFile(File f, File dstFolder);
then in lambda:
Stream.generate(localFileProvider::getNextFile).forEach(file->
{
processFile(file);
moveFile(file, dstFolder);
}
);
Actually it will be very easy if you can divide the logic into different method
public Path readFile(File eachFile) {
BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
//try-with-resources
try (Stream<String> lines = reader.lines()) {
lines.forEach(System.out::println);
}
catch (IOException e) {
e.printStackTrace();
}
return eachFile.toPath();
}
And then call this method for each file
(1) Stream.generate(localFileProvider::getNextFile)
(2) .map(this::readFile) //process each file
(3) .forEach(path->Files.move(path,Paths.get("new path"))); //then move each file
Can do something like this:
files
.map( file -> {
getBufferedReader( file ).lines()
.forEach( System.out::println );
return file;
} )
.forEach( this::moveFile );
Update for checked exceptions and Reader.close
:
Admittedly, this doesn't run close()
in a finally
block, so that is a downside. The point of this update is mainly to illustrate a way of dealing with checked exceptions in Java 8 streams.
Let's say you have the following utility code available:
private interface ThrowingFunction<I, O>
{
O apply( I input ) throws Exception;
}
private <I, O> Function<I, O> unchecked( ThrowingFunction<I, O> checked )
{
return i -> {
try {
return checked.apply( i );
}
catch ( Exception e ) {
throw new RuntimeException();
}
};
}
private interface ThrowingConsumer<T>
{
void consume( T input ) throws Exception;
}
private <T> Consumer<T> unchecked( ThrowingConsumer<T> checked )
{
return t -> {
try {
checked.consume( t );
}
catch ( Exception e ) {
throw new RuntimeException();
}
};
}
private BufferedReader getBufferedReader( File file ) throws FileNotFoundException
{
return new BufferedReader( new InputStreamReader( new FileInputStream( file )));
}
Writing the actual code then becomes:
files
.map( file -> {
Stream.of( file )
.map( unchecked( this::getBufferedReader ))
.map( reader -> {
reader.lines().forEach( System.out::println );
return reader;
} )
.forEach( unchecked( Reader::close ));
return file;
} )
.forEach( this::moveFile );
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With