Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Downlolad and save file from ClientRequest using ExchangeFunction in Project Reactor

I have problem with correctly saving a file after its download is complete in Project Reactor.

class HttpImageClientDownloader implements ImageClientDownloader {

    private final ExchangeFunction exchangeFunction;

    HttpImageClientDownloader() {
        this.exchangeFunction = ExchangeFunctions.create(new ReactorClientHttpConnector());
    }

    @Override
    public Mono<File> downloadImage(String url, Path destination) {

        ClientRequest clientRequest = ClientRequest.create(HttpMethod.GET, URI.create(url)).build();
        return exchangeFunction.exchange(clientRequest)
                .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                //.flatMapMany(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))
                .flatMap(dataBuffer -> {

                    AsynchronousFileChannel fileChannel = createFile(destination);
                    return DataBufferUtils
                            .write(dataBuffer, fileChannel, 0)
                            .publishOn(Schedulers.elastic())
                            .doOnNext(DataBufferUtils::release)
                            .then(Mono.just(destination.toFile()));


                });

    }

    private AsynchronousFileChannel createFile(Path path) {
        try {
            return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE);
        } catch (Exception e) {
            throw new ImageDownloadException("Error while creating file: " + path, e);
        }
    }
}

So my question is: Is DataBufferUtils.write(dataBuffer, fileChannel, 0) blocking?

What about when the disk is slow?

And second question about what happens when ImageDownloadException occurs , In doOnNext I want to release the given data buffer, is that a good place for this kind operation?

I think also this line:

            .map(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()))

could be blocking...

like image 595
K2mil J33 Avatar asked Jan 03 '23 11:01

K2mil J33


1 Answers

Here's another (shorter) way to achieve that:

Flux<DataBuffer> data = this.webClient.get()
        .uri("/greeting")
        .retrieve()
        .bodyToFlux(DataBuffer.class);

Path file = Files.createTempFile("spring", null);
WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
Mono<File> result = DataBufferUtils.write(data, channel)
        .map(DataBufferUtils::release)
        .then(Mono.just(file));

Now DataBufferUtils::write operations are not blocking because they use non-blocking IO with channels. Writing to such channels means it'll write whatever it can to the output buffer (i.e. may write all the DataBuffer or just part of it).

Using Flux::map or Flux::doOnNext is the right place to do that. But you're right, if an error occurs, you're still responsible for releasing the current buffer (and all the remaining ones). There might be something we can improve here in Spring Framework, please keep an eye on SPR-16782.

I don't see how your last sample shows anything blocking: all methods return reactive types and none are doing blocking I/O.

like image 197
Brian Clozel Avatar answered Apr 06 '23 19:04

Brian Clozel