I'm scanning a directory for files, then processing the results. I want to remove files from the result of the scan that I already have in a datastore before further processing.
Trying to do this in a reactive fashion, using reactive mongodb. I'm just unsure how to implement a filter in such a manner as to use the result of a database query.
@Override
public Flux<File> findFiles(Directory directory) {
// Only get these file types as we can't process anything else
final Predicate<Path> extensions = path ->
path.toString().endsWith(".txt") ||
path.toString().endsWith(".doc") ||
path.toString().endsWith(".pdf");
final Set<File> files = fileService.findAll(Paths.get(directory.getPath()), extensions);
final Stream<Video> fileStream = files
.stream()
.map(this::convertFileToDocument)
// This is wrong (doesn't compile for a start), but how do I do something similar or of this nature?
.filter(file -> fileRepository.findById(file.getId()));
return Flux.fromStream(fileStream);
}
convertFileToDocument just maps the file to a POJO, nothing fun happening there.
How can I go about adding the filter based on the result of the findById, or is there a better way to achieve this?
If fileRepository.findById returns a Mono, I would recommend you convert the stream to a flux and then filter using filterWhen; checking if the Mono has an element. Something like
final Stream<Video> fileStream = files
.stream()
.map(this::convertFileToDocument);
return Flux.fromStream(fileStream).filterWhen(file -> fileRepository.findById(file.getId()).hasElement().map(b -> !b));
This will filter out all files that return a non-empty Mono for findById, or exist in the database. If I misunderstood something, please let me know.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With