Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

File handing in WebFlux (Reactor)

I'm working on a greenfield reactive project where a lot of file handling IO is going on. Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler? Will the boundedElastic pool size limit the number of concurrent operations?

If this is not the correct method, can you show an example how to write bytes to a file using Reactor?

like image 456
Peter Avatar asked Sep 05 '25 02:09

Peter


2 Answers

Is it sufficient if I write the IO code in an imperative blocking manner and then wrap them in a Mono, publish them on boundedElastic scheduler?

This comes down to opinion on some level - but no, certainly not ideal not for a reactive greenfield project IMHO. boundedElastic() schedulers are great for interfacing with blocking IO when you must, but they're not a good replacement when a true non-blocking solution exists. (Sometimes this is a bit of a moot point with file handling, since it depends if it's possible for the underlying system to do it asynchronously - but usually that's possible these days.)

In your case, I'd look at wrapping AsynchronousFileChannel in a reactive publisher. You'll need to use create() or push() for this and then make explicit calls to the sink, but exactly how you do this depends on your use case. As a "simplest case" for file writing, you could feasibly do something like:

static Mono<Void> writeToFile(AsynchronousFileChannel channel, String content) {
    return Mono.create(sink -> {
        byte[] bytes = content.getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
        buffer.put(bytes);
        buffer.flip();

        channel.write(buffer, 0, null, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, Object attachment) {
                sink.success();
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                sink.error(exc);
            }
        });
    });
}

A more thorough / comprehensive example of bridging the two APIs can be found here - there's almost certainly others around also.

like image 53
Michael Berry Avatar answered Sep 07 '25 22:09

Michael Berry


After some researching the java.nio and Spring library I have found the convenient approach to write strings to file as DataBuffers (which perfectly connect with WebFlux) into AsynchronousFileChannel using Spring classes.

It's not "truly" reactive way to write lines in file, but asyncronous and it is still better than using some standard blocking API.

public Mono<Void> writeRows(Flux<String> rowsFlux) {
    DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory();
    CharSequenceEncoder encoder = CharSequenceEncoder.textPlainOnly();

    Flux<DataBuffer> dataBufferFlux = rowsFlux.map(line ->
            encoder.encodeValue(line, bufferFactory, ResolvableType.NONE, null, null)
    );
    return DataBufferUtils.write(
            dataBufferFlux,
            Path.of("/your_path_to_save_file/file.txt"),
            StandardOpenOption.CREATE_NEW
    );
}

Of course, for better performance in this case you can buffer your strings in flux and then append those strings to one string and create a data buffer from it.

Or if you already have Flux of data buffers you can write them to file using DataBufferUtils directly.

like image 25
kerbermeister Avatar answered Sep 07 '25 23:09

kerbermeister



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!