Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous update of promise in Netty Nio

I have got a server and client architecture that exchange information. I want to return from the server the number of connected channels. I want to return the message of the server to the clients using promise. My code is:

public static void callBack () throws Exception{

   String host = "localhost";
   int port = 8080;

   try {
       Bootstrap b = new Bootstrap();
       b.group(workerGroup);
       b.channel(NioSocketChannel.class);
       b.option(ChannelOption.SO_KEEPALIVE, true);
       b.handler(new ChannelInitializer<SocketChannel>() {
        @Override
           public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
           }
       });
       ChannelFuture f = b.connect(host, port).sync();
       //f.channel().closeFuture().sync();
   }
   finally {
    //workerGroup.shutdownGracefully();
   }
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {

    Object msg = promise.get();
    System.out.println("The number if the connected clients is not two");
    int ret = Integer.parseInt(msg.toString());
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

When I run one client it is always receiving the message The number if the connected clients is not two and the returning number is always one. When I run a second client it is receiving always as a returning value two, however, the first client still is receiving one. I cannot find which is the correct way to update the promise for the case of the first client.

EDIT: Client Server:

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final Promise<Object> promise;
  public ClientHandler(Promise<Object> promise) {
      this.promise = promise;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      promise.trySuccess(msg);
  }
} 

The code from the client handler storing the message received from server to the promise.

like image 445
konstantin Avatar asked Oct 30 '17 12:10

konstantin


1 Answers

With the Netty framework, a Promise and a Future are write-once objects, this principle makes them easier to use in a multithreaded environment.

Since a Promise doesn't do what you want, we need to see if other technologies are fit for your conditions, your conditions basically boil down to:

  • Read from multiple threads
  • Write from a single thread only (as inside a Netty channel the read method can only be executed by 1 thread at the same time, unless the channel is marked shareable)

For these requirements, the best fitting match is a volatile variable, as this is thread-safe for reading, and can safely be updated by 1 thread without worrying about the write order.

To update your code for usage with a volatile variable, it requires some modifications, as we cannot easily pass the reference link to the variable inside your function, but we must pass a function that updates the backend variable.

private static volatile int connectedClients = 0;
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {connectedClients = i;});
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  while (true) {
    System.out.println("The number if the connected clients is not two");
    int ret = connectedClients;
    if (ret == 2){
        break;
    }
  }
  System.out.println("The number if the connected clients is two");
}

public class ClientHandler extends ChannelInboundHandlerAdapter {
  public final IntConsumer update;
  public ClientHandler(IntConsumer update) {
      this.update = update;
  }

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      RequestData msg = new RequestData();
      msg.setIntValue(123);
      msg.setStringValue("all work and no play makes jack a dull boy");
      ctx.writeAndFlush(msg);
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      System.out.println(msg);
      update.accept(Integer.parseInt(msg));
  }
} 

While the approach above should work, we quickly see that the while loop inside the main class uses a large share of CPU time, and this may affect other parts of your local client system, luckily, this problem is also solvable if we add other parts to the system, namely synchronization. By leaving the initial read of the connectedClients outside the synchronization block, we can still profit from the quick reads in the case of the "true" case, and in case of the "false' case, we can safe important CPU cycles that can be used in other parts of your system.

To tackle this problem, we use the following steps when reading:

  1. Store the value of connectedClients in a separate variable
  2. Compare this variable with the target value
  3. If it's true, then break early out of the loop
  4. If false, go inside a synchronized block
  5. start a while true loop
  6. Read out the variable again, since the value might be changed now
  7. Check the condition, and break if condition is correct now
  8. If not, wait for a change in the value

And the following when writing:

  1. synchronize
  2. Update the value
  3. Wake up all other threads waiting for this value

This can be implemented in code as the following:

private static volatile int connectedClients = 0;
private static final Object lock = new Object();
public static void callBack () throws Exception{
    //....
           ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(),
                                 new ClientHandler(i -> {
               synchronized (lock) {
                   connectedClients = i;
                   lock.notifyAll();
               }
           });
    //....
}

public static void main(String[] args) throws Exception {

  callBack();
  int connected = connectedClients;
  if (connected != 2) {
      System.out.println("The number if the connected clients is not two before locking");
      synchronized (lock) {
          while (true) {
              connected = connectedClients;
              if (connected == 2)
                  break;
              System.out.println("The number if the connected clients is not two");
              lock.wait();
          }
      }
  }
  System.out.println("The number if the connected clients is two: " + connected );
}

Server side changes

However, not all of your problems are related to the client side.

SInce you posted a link to your github repository, you never send a request from the server back to the old clients when a new person has joined. Because this is not done, the client is never notified about the change, make sure to do this as well.

like image 54
Ferrybig Avatar answered Sep 22 '22 17:09

Ferrybig