Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I know if there is no data to read in Netty ByteBuf?

Tags:

java

netty

I'm new to Netty. There is a problem about file transfer confusing me for days. I want to send image file from client to server.

The code below is executable. But only I shutdown server forcibly can I open received image file normally. Otherwise, it shows "It looks like you don't have permission to view this file. Check the permissions and try again". So I want to close fileoutputstream when there is no data in ByteBuf using ByteBuf.isReadable(), but the else block in method channelRead in ServerHandler never reach. It's useless.

Besides, if sending text file, it can be open normally when server is alive. I don't want to shutdown server every time after transfer. Please give me some suggestions to solve it.

This is FileClientHandler

public class FileClientHandler extends ChannelInboundHandlerAdapter {

private int readLength = 8;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    sendFile(ctx.channel());
}

private void sendFile(Channel channel) throws IOException {
    File file = new File("C:\\Users\\xxx\\Desktop\\1.png");
    FileInputStream fis = new FileInputStream(file);
    BufferedInputStream bis = new BufferedInputStream(fis);

    for (;;) {
        byte[] bytes = new byte[readLength];
        int readNum = bis.read(bytes, 0, readLength);
        // System.out.println(readNum);
        if (readNum == -1) {
            bis.close();
            fis.close();
            return;
        }
        sendToServer(bytes, channel, readNum);
    }
}

private void sendToServer(byte[] bytes, Channel channel, int length)
        throws IOException {
    channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length));
}

}

This is FileServerHandler

public class FileServerHandler extends ChannelInboundHandlerAdapter {

private File file = new File("C:\\Users\\xxx\\Desktop\\2.png");
private FileOutputStream fos;

public FileServerHandler() {
    try {
        if (!file.exists()) {
            file.createNewFile();
        } else {
            file.delete();
            file.createNewFile();
        }
        fos = new FileOutputStream(file);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    try {
        ByteBuf buf = (ByteBuf) msg;
        if (buf.isReadable()) {
            buf.readBytes(fos, buf.readableBytes());
            fos.flush();
        } else {
            System.out.println("I want to close fileoutputstream!");
            buf.release();
            fos.flush();
            fos.close();
        }

    } catch (Exception e) {
        e.printStackTrace();
    }
}
}
like image 258
Leisure Dong Avatar asked Aug 16 '18 06:08

Leisure Dong


People also ask

What is ByteBuf in Netty?

A random and sequential accessible sequence of zero or more bytes (octets). This interface provides an abstract view for one or more primitive byte arrays ( byte[] ) and NIO buffers.


1 Answers

Fixing the server side

In the Netty world, there are multiple "events":

  • channelActive
  • channelRead
  • channelReadComplete
  • channelInactive
  • exceptionCaught
  • more...

Of these "events", you probably already knows what channelRead does (since your using it), but another one that you seem to need is the channelInactive. This one is called when the other endpoint shuts the connection down, and you can use it like this:

@Override
public void channelInactive(ctx) {
    System.out.println("I want to close fileoutputstream!");
    fos.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
    try {
        ByteBuf buf = (ByteBuf) msg;
        // if (buf.isReadable()) { // a buf should always be readable here
            buf.readBytes(fos, buf.readableBytes());
        //    fos.flush(); // flushing is always done when closing
        //} else {
        //    System.out.println("I want to close fileoutputstream!");
        //    buf.release(); // Should be placed in the finally block
        //    fos.flush();
        //    fos.close();
        //}
    } catch (Exception e) {
         e.printStackTrace();
    } finally {
         buf.release(); // Should always be done, even if writing to the file fails
    }
}

However, how does the server know the connection has shutdown? At the moment the client does not shut the server down, and instead keep running in the background forever keeping the connection alive.

Fixing the client side

To properly shutdown the connection from the client, we need to call channel.close(), however, we cannot directly insert this before the return line, as this causes a race condition between sending the data, and closing the connection in the network layer, potentially dropping data.

To properly handle these conditions, Netty uses a Future system that allows code to handle events after asynchronous actions happen.

Lucky for us, Netty already has a build in solution for this, we only need to wire it up. To wire this solution up to our code, we have to keep track of the latest ChannelFuture emitted by Netty's write method.

To properly implement this solution, we change sendToServer to return the result of the write method:

private ChannelFuture sendToServer(byte[] bytes, Channel channel, int length)
        throws IOException {
    return channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length));
}

Then we keep keep track of this return value, and add a listener containing Netty's build in solution when we want to close the connection:

ChannelFuture lastFuture = null;
for (;;) {
    byte[] bytes = new byte[readLength];
    int readNum = bis.read(bytes, 0, readLength);
    // System.out.println(readNum);
    if (readNum == -1) {
        bis.close();
        fis.close();
        if(lastFuture == null) { // When our file is 0 bytes long, this is true
            channel.close();
        } else {
            lastFuture.addListener(ChannelFutureListener.CLOSE);
        }
        return;
    }
    lastFuture = sendToServer(bytes, channel, readNum);
}
like image 174
Ferrybig Avatar answered Oct 26 '22 22:10

Ferrybig