Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

FileChannel zero-copy transferTo fails to copy bytes to SocketChannel

I'm seeing some strange behavior when transferring large files from file to socket using zero-copy in Java. My environments:

  • Windows 7 64-bit JDK 1.6.0_45 and 1.7.0_79.
  • Centos 6.6 64-bit JDK 1.6.0_35

What the program does: client copies an input file into a socket, and server copies socket to output file using zero-copy methods: transferFrom and transferTo. Not all bytes are reaching the server if file size is relatively large, 100Mb+ in case of Windows and 2GB+ in case of Centos. Client and server reside on the same machine and localhost address is used to transfer data.

The behavior is different depending on OS. On Windows, the client completes transferTo method successfully. The number of transferred bytes is equal to input file size.

long bytesTransferred = fileChannel.transferTo(0, inputFile.length(), socketChannel);

The server on the other hand, reports a lower number of received bytes.

long transferFromByteCount = fileChannel.transferFrom(socketChannel, 0, inputFile.length());

On Linux bytesTransferred on client is 2Gb even if input file size is 4Gb. There is sufficient space on both configurations.

On Windows I was able to transfer a 130Mb file with the one of the following workarounds: 1) increasing receive buffer size on server and 2) adding thread sleep method in client. This leads me to think that transferTo method on the client completes when all bytes are sent to socket send buffer, not to server. Whether or not those bytes make it to server is not guaranteed, which creates problems for my use case.

On Linux maximum file size that I'm able to transfer with a single transferTo invocation is 2Gb, however at least the client reports a correct number of bytes sent to server.

My questions: what's the best way for the client to ensure guaranteed delivery of the file to the server, cross-platform? What mechanisms are used to emulate sendfile() on Windows?

Here's the code:

Client - ZeroCopyClient.java:

import org.apache.commons.io.FileUtils;

import java.io.*;
import java.net.*;
import java.nio.channels.*;

public class ZeroCopyClient {

    public static void main(String[] args) throws IOException, InterruptedException {

        final File inputFile = new File(args[0]);

        FileInputStream fileInputStream = new FileInputStream(inputFile);
        FileChannel fileChannel = fileInputStream.getChannel();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(socketAddress);

        System.out.println("sending " + inputFile.length() + " bytes to " + socketChannel);

        long startTime = System.currentTimeMillis();
        long totalBytesTransferred = 0;
        while (totalBytesTransferred < inputFile.length()) {
            long st = System.currentTimeMillis();
            long bytesTransferred = fileChannel.transferTo(totalBytesTransferred, inputFile.length()-totalBytesTransferred, socketChannel);
            totalBytesTransferred += bytesTransferred;
            long et = System.currentTimeMillis();
            System.out.println("sent " + bytesTransferred + " out of " + inputFile.length() + " in " + (et-st) + " millis");
        }

        socketChannel.finishConnect();
        long endTime = System.currentTimeMillis();

        System.out.println("sent: totalBytesTransferred= " + totalBytesTransferred + " / " + inputFile.length() + " in " + (endTime-startTime) + " millis");

        final File outputFile = new File(inputFile.getAbsolutePath() + ".out");
        boolean copyEqual = FileUtils.contentEquals(inputFile, outputFile);
        System.out.println("copyEqual= " + copyEqual);

        if (args.length > 1) {
            System.out.println("sleep: " + args[1] + " millis");
            Thread.sleep(Long.parseLong(args[1]));
        }
    }
}

Server - ZeroCopyServer.java:

import java.io.*;
import java.net.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

    public static void main(String[] args) throws IOException {

        final File inputFile = new File(args[0]);
        inputFile.delete();
        final File outputFile = new File(inputFile.getAbsolutePath() + ".out");
        outputFile.delete();

        createTempFile(inputFile, Long.parseLong(args[1])*1024L*1024L);

        System.out.println("input file length: " + inputFile.length() + " : output file.exists= " + outputFile.exists());

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().setReceiveBufferSize(8*1024*1024);
        System.out.println("server receive buffer size: " + serverSocketChannel.socket().getReceiveBufferSize());
        serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 8083));
        System.out.println("waiting for connection");
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println("connected. client channel: " + socketChannel);

        FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
        FileChannel fileChannel = fileOutputStream.getChannel();
        long startTime = System.currentTimeMillis();
        long transferFromByteCount = fileChannel.transferFrom(socketChannel, 0, inputFile.length());
        long endTime = System.currentTimeMillis();
        System.out.println("received: transferFromByteCount= " + transferFromByteCount + " : outputFile= " + outputFile.length() + " : inputFile= " + inputFile.length() + " bytes in " + (endTime-startTime) + " millis");

        boolean copyEqual = FileUtils.contentEquals(inputFile, outputFile);
        System.out.println("copyEqual= " + copyEqual);

        serverSocketChannel.close();

    }

    private static void createTempFile(File file, long size) throws IOException{
        RandomAccessFile f = new RandomAccessFile(file.getAbsolutePath(), "rw");
        f.setLength(size);
        f.writeDouble(Math.random());
        f.close();
    }

}

UPDATE 1: Linux code fixed with loop.

UPDATE 2: One possible workaround I'm considering requires client-server cooperation. At the end of transmission the server writes the length of received data back to client which the client reads it in blocking mode.

Server responds:

ByteBuffer response = ByteBuffer.allocate(8);
response.putLong(transferFromByteCount);
response.flip();
socketChannel.write(response);   
serverSocketChannel.close(); 

The client blocks with read:

ByteBuffer response = ByteBuffer.allocate(8);
socketChannel.read(response);
response.flip();
long totalBytesReceived = response.getLong();

As a result, the client waits for the bytes to pass through send and receive socket buffers, and in fact waits for bytes to get stored in the output file. There is no need to implement out-of-band acknowledgements and there's also no need for the client to wait as suggested in section II.A https://linuxnetworkstack.files.wordpress.com/2013/03/paper.pdf in case file content is mutable.

"wait an “appropriate” amount of time before rewriting the same portion of file"

UPDATE 3:

A modified example incorporating fixes by @EJP and @the8472, with both length and file checksum verification, without output tracing. Note that computing CRC32 checksum for a large file may take a few seconds to complete.

Client:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyClient {

    public static void main(String[] args) throws IOException {

        final File inputFile = new File(args[0]);

        FileInputStream fileInputStream = new FileInputStream(inputFile);
        FileChannel fileChannel = fileInputStream.getChannel();
        SocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(socketAddress);

        //send input file length and CRC32 checksum to server
        long checksumCRC32 = FileUtils.checksumCRC32(inputFile);
        ByteBuffer request = ByteBuffer.allocate(16);
        request.putLong(inputFile.length());
        request.putLong(checksumCRC32);
        request.flip();
        socketChannel.write(request);

        long totalBytesTransferred = 0;
        while (totalBytesTransferred < inputFile.length()) {
            long bytesTransferred = fileChannel.transferTo(totalBytesTransferred, inputFile.length()-totalBytesTransferred, socketChannel);
            totalBytesTransferred += bytesTransferred;
        }

        //receive output file length and CRC32 checksum from server
        ByteBuffer response = ByteBuffer.allocate(16);
        socketChannel.read(response);
        response.flip();
        long totalBytesReceived = response.getLong();
        long outChecksumCRC32 = response.getLong();

        socketChannel.finishConnect();

        System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

    }
}

Server:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

    public static void main(String[] args) throws IOException {

        final File outputFile = new File(args[0]);

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(8083));     
        SocketChannel socketChannel = serverSocketChannel.accept();

        //read input file length and CRC32 checksum sent by client
        ByteBuffer request = ByteBuffer.allocate(16);
        socketChannel.read(request);
        request.flip();
        long length = request.getLong();
        long checksumCRC32 = request.getLong();

        FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
        FileChannel fileChannel = fileOutputStream.getChannel();
        long totalBytesTransferFrom = 0;
        while (totalBytesTransferFrom < length) {
            long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
            if (transferFromByteCount <= 0){
                break;
            }
            totalBytesTransferFrom += transferFromByteCount;
        }

        long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);

        //write output file length and CRC32 checksum back to client
        ByteBuffer response = ByteBuffer.allocate(16);
        response.putLong(totalBytesTransferFrom);
        response.putLong(outChecksumCRC32);
        response.flip();
        socketChannel.write(response);

        serverSocketChannel.close();

        System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

    }
}
like image 663
Sergei Rodionov Avatar asked Oct 19 '22 12:10

Sergei Rodionov


1 Answers

The solution is to check write counter from fileChannel.transferFrom:

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import org.apache.commons.io.FileUtils;

public class ZeroCopyServer {

public static void main(String[] args) throws IOException {

    final File outputFile = new File(args[0]);

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.socket().bind(new InetSocketAddress(8083));     
    SocketChannel socketChannel = serverSocketChannel.accept();

    //read input file length and CRC32 checksum sent by client
    ByteBuffer request = ByteBuffer.allocate(16);
    socketChannel.read(request);
    request.flip();
    long length = request.getLong();
    long checksumCRC32 = request.getLong();

    FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
    FileChannel fileChannel = fileOutputStream.getChannel();
    long totalBytesTransferFrom = 0;
    while (totalBytesTransferFrom < length) {
        long transferFromByteCount = fileChannel.transferFrom(socketChannel, totalBytesTransferFrom, length-totalBytesTransferFrom);
        if (transferFromByteCount <= 0){
            break;
        }
        totalBytesTransferFrom += transferFromByteCount;
    }

    long outChecksumCRC32 = FileUtils.checksumCRC32(outputFile);

    //write output file length and CRC32 checksum back to client
    ByteBuffer response = ByteBuffer.allocate(16);
    response.putLong(totalBytesTransferFrom);
    response.putLong(outChecksumCRC32);
    response.flip();
    socketChannel.write(response);

    serverSocketChannel.close();

    System.out.println("CRC32 equal= " + (checksumCRC32 == outChecksumCRC32));

  }
}
like image 58
Sergei Rodionov Avatar answered Oct 22 '22 02:10

Sergei Rodionov