Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

is there a way to read a InputStream asynchronously with Reactor or transform to bytes?

I'm trying to upload a file to S3, but JVM says that i have a thread-blocking method calls in code fragments where threads should not be blocked when calling file.readAllBytes(), so is there a way to is there a way to make the method asynchronous with Flux or Mono? or any other way to solve that problem?

private Mono<Boolean> uploadFile(InputStream file, String bucket, String name) {
        try {
            return uploadAdapter.uploadObject(bucket,name,file.readAllBytes());
        } catch (IOException e) {
            return Mono.just(false);
        }
    }
@Override
    public Mono<Boolean> uploadObject(String bucketName, String objectKey, byte[] fileContent) {
        return Mono.fromFuture(
                        s3AsyncClient.putObject(configurePutObject(bucketName, objectKey),
                                AsyncRequestBody.fromBytes(fileContent)))
                .map(response -> response.sdkHttpResponse().isSuccessful());
    }
like image 644
Daniel Fernando Gomez Avatar asked Nov 06 '22 23:11

Daniel Fernando Gomez


1 Answers

Since InputStream is a synchronous API, you have 2 choices, and that's true about any other synchronous API:

  1. To switch to another API. This may be a good and possible solution for many problems. The reactive concepts and asynchronous in general are very common and for most of the needs, there is an alternative async library that will do the same thing. In your case, you can use java.nio2, or functions from the Reactor-Netty library, which has great solutions for this use case.
  2. Use another scheduler. Project reactor suggests that all of your asynchronous calls will operate on one non-blocking scheduler, and for synchronous calls, use another (blocking) thread-per-request scheduler. There are two schedulers like this that you can use: single() and boundedElastic(). The difference is that boundedElastic() limits the number of threads that you can open, so you will end up using your threads as blocking queues, which is safer than single().
like image 198
user7551211 Avatar answered Nov 11 '22 04:11

user7551211