Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java - ReadObject with nio

In a traditional blocking-thread server, I would do something like this

class ServerSideThread {

    ObjectInputStream in;
    ObjectOutputStream out;
    Engine engine;

    public ServerSideThread(Socket socket, Engine engine) {
        in = new ObjectInputStream(socket.getInputStream());
        out = new ObjectOutputStream(socket.getOutputStream());
        this.engine = engine;
    }

    public void sendMessage(Message m) {
        out.writeObject(m);
    }

    public void run() {
        while(true) {
            Message m = (Message)in.readObject();
            engine.queueMessage(m,this); // give the engine a message with this as a callback
        }
    }
}

Now, the object can be expected to be quite large. In my nio loop, I can't simply wait for the object to come through, all my other connections (with much smaller workloads) will be waiting on me.

How can I only get notified that a connection has the entire object before it tells my nio channel it's ready?

like image 728
corsiKa Avatar asked May 02 '11 22:05

corsiKa


2 Answers

You can write the object to a ByteArrayOutputStream allowing you to give the length before an object sent. On the receiving side, read the amount of data required before attempting to decode it.

However, you are likely to find it much simpler and more efficient to use blocking IO (rather than NIO) with Object*Stream


Edit something like this

public static void send(SocketChannel socket,  Serializable serializable) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    for(int i=0;i<4;i++) baos.write(0);
    ObjectOutputStream oos = new ObjectOutputStream(baos);
    oos.writeObject(serializable);
    oos.close();
    final ByteBuffer wrap = ByteBuffer.wrap(baos.toByteArray());
    wrap.putInt(0, baos.size()-4);
    socket.write(wrap);
}

private final ByteBuffer lengthByteBuffer = ByteBuffer.wrap(new byte[4]);
private ByteBuffer dataByteBuffer = null;
private boolean readLength = true;

public Serializable recv(SocketChannel socket) throws IOException, ClassNotFoundException {
    if (readLength) {
        socket.read(lengthByteBuffer);
        if (lengthByteBuffer.remaining() == 0) {
            readLength = false;
            dataByteBuffer = ByteBuffer.allocate(lengthByteBuffer.getInt(0));
            lengthByteBuffer.clear();
        }
    } else {
        socket.read(dataByteBuffer);
        if (dataByteBuffer.remaining() == 0) {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(dataByteBuffer.array()));
            final Serializable ret = (Serializable) ois.readObject();
            // clean up
            dataByteBuffer = null;
            readLength = true;
            return ret;
        }
    }
    return null;
}
like image 125
Peter Lawrey Avatar answered Oct 06 '22 20:10

Peter Lawrey


Inspired by the code above I've created a (GoogleCode project)

It includes a simple unit test:

SeriServer server = new SeriServer(6001, nthreads);
final SeriClient client[] = new SeriClient[nclients];

//write the data with multiple threads to flood the server

for (int cnt = 0; cnt < nclients; cnt++) {
    final int counterVal = cnt;
    client[cnt] = new SeriClient("localhost", 6001);
    Thread t = new Thread(new Runnable() {
         public void run() {
             try {
                for (int cnt2 = 0; cnt2 < nsends; cnt2++) {
                   String msg = "[" + counterVal + "]";                       
                   client[counterVal].send(msg);
                 }
             } catch (IOException e) {
                 e.printStackTrace();
                 fail();
             }
         }
         });
    t.start();
 }

 HashMap<String, Integer> counts = new HashMap<String, Integer>();
   int nullCounts = 0;
   for (int cnt = 0; cnt < nsends * nclients;) {
       //read the data from a vector (that the server pool automatically fills
       SeriDataPackage data = server.read();  
       if (data == null) {
              nullCounts++;
              System.out.println("NULL");
              continue;
       }

       if (counts.containsKey(data.getObject())) {
              Integer c = counts.get(data.getObject());
              counts.put((String) data.getObject(), c + 1);
        } else {
              counts.put((String) data.getObject(), 1);
        }
        cnt++;
        System.out.println("Received: " + data.getObject());
   }

   // asserts the results
   Collection<Integer> values = counts.values();
   for (Integer value : values) {
        int ivalue = value;
        assertEquals(nsends, ivalue);
        System.out.println(value);
   }
   assertEquals(counts.size(), nclients);
   System.out.println(counts.size());
   System.out.println("Finishing");
   server.shutdown();
like image 45
Feiteira Avatar answered Oct 06 '22 19:10

Feiteira