I need to call an upstream service (Azure Blob Service) to push data to an OutputStream, which then i need to turn around and push it back to the client, thru akka. Without akka (and just servlet code), i'd just get the ServletOutputStream and pass it to the azure service's method.
The closest i can try to stumble upon, and clearly this is wrong, is something like this
Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
blobClient.download(os);
return os;
});
ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);
sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());
The idea is i'm calling an upstream service to get an outputstream populated by calling blobClient.download(os);
It seems like the the lambda function gets called and returns, but then afterwards it fails, because there's no data or something. As if i'm not supposed to be have that lambda function do the work, but perhaps return some object that does the work? Not sure.
How does one do this?
The real issue here is that the Azure API is not designed for back-pressuring. There is no way for the output stream to signal back to Azure that it is not ready for more data. To put it another way: if Azure pushes data faster than you are able to consume it, there will have to be some ugly buffer overflow failure somewhere.
Accepting this fact, the next best thing we can do is:
Source.lazySource
to only start downloading data when there is downstream demand (aka. the source is being run and data is being requested).download
call in some other thread so that it continues executing without blocking the source from being returned. Once way to do this is with a Future
(I'm not sure what Java best practices are, but should work fine either way). Although it won't matter initially, you may need to choose an execution context other than system.dispatcher
- it all depends on whether download
is blocking or not.I apologize in advance if this Java code is malformed - I use Akka with Scala, so this is all from looking at the Akka Java API and Java syntax reference.
ResponseEntity responseEntity = HttpEntities.create(
ContentTypes.APPLICATION_OCTET_STREAM,
preAuthData.getFileSize(),
// Wait until there is downstream demand to intialize the source...
Source.lazySource(() -> {
// Pre-materialize the outputstream before the source starts running
Pair<OutputStream, Source<ByteString, NotUsed>> pair =
StreamConverters.asOutputStream().preMaterialize(system);
// Start writing into the download stream in a separate thread
Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());
// Return the source - it should start running since `lazySource` indicated demand
return pair.second();
})
);
sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With