In reference to my earlier question here, I am trying to figure out the correct way to stream a large object from the server to the client.
The server is doing a write
followed by a flush
, and there's a handler in the Channel
which will take the large object and iteratively decompose it into small chunks doing a writeAndFlush
on each chunk. I was hoping this would turn the large object into a stream of manageable messages the client could consume and reconstruct as needed.
What I see is each of those sits in the outbound buffer until either they've all be written and can be sent, or I OOM the server. I'd like them to get flushed/sent as they're written to avoid this issue. (If I don't OOM the server, it does come into the client as a stream like designed, it's that the transmission of any data is blocked until all data is written).
If I try to do the decompose operation by dispatching it via Callable
to the Executor
associated with the ChannelHandlerContext
and then marking the original ChannelPromise
as successful, I still see the same behavior.
If I instead use a non-netty Executor
, I seem to get the correct behavior, I can see data landing on the client immediately, but this feels like the wrong solution.
When decomposing the large object into smaller, writable ones, I do see that immediately after I write the first object, the channel writeability changes to false. I realize, as @norman-maurer pointed out, that I should stop writing at this point. But it's not clear how to get the event that I can safely resume writing. It's also not clear how you'd handle the write in that case, but maybe that is because I don't see how you can get the event.
Norman did answer this but I'll try and make it a bit clearer.
It sounds like your first attempt is looping something like this
while(moreToSend) {
channel.writeAndFlush(...)
moreToSend = checkFinished();
}
It sounds like you're doing this in the channel's IO thread which is the thread which will write your data to the socket. The thread may not be able to write the data while it's processing the loop. Netty's only option is to queue the writes.
I'm not completely familiar with Netty 4.x (still on 3.x) however it sounds like the same thing is happening in your second attempt. By dispatching the Callable to the executor associated with the ChannelHandlerContext
then, unless you've wrapped you handler in another executor, you're actually asking the Callable to be executed on the IO thread and the same problem applies.
Norman is saying that you should call writeAndFlush
until channel.isWritable
returns false. You can resume writing when Netty raises channelWritabilityChanged
and channel.isWritable
returns true. Note channelWritabilityChanged
is raised on ChannelInboundHandler
which implies that your handler needs to be both ChannelInboundHandler
and ChannelOutboundHandler
(assuming it's already an outbound handler).
Alternatively, rather than trying to write as much as possible in one go, you can write the first chunk and register a listener with the returned ChannelFuture
. When operationComplete
is called back, then if future.isSuccess
returns true write the next chunk in the same way. This way you will write chunks as and when the previous chunk has been flushed to the OS send buffers. It should also work well if you need to mix sending the large object with other traffic.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With