Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java non-blocking IO selector causing channel register to block

I have two threads that I'm dealing with Java NIO for non-blocking sockets. This is what the threads are doing:

Thread 1: A loop that calls on the select() method of a selector. If any keys are available, they are processed accordingly.

Thread 2: Occasionally registers a SocketChannel to the selector by calling register().

The problem is, unless the timeout for select() is very small (like around 100ms), the call to register() will block indefinitely. Even though the channel is configured to be nonblocking, and the javadocs state that the Selector object is thread safe (but it's selection keys are not, I know).

So anyone have any ideas on what the issue could be? The application works perfectly if I put everything in one thread. No problems occur then, but I'd really like to have separate threads. Any help is appreciated. I've posted my example code below:

Change the select(1000) to select(100) and it'll work. Leave it as select() or select(1000) and it won't.


import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class UDPSocket { private DatagramChannel clientChannel; private String dstHost; private int dstPort; private static Selector recvSelector; private static volatile boolean initialized; private static ExecutorService eventQueue = Executors.newSingleThreadExecutor();

public static void init() { initialized = true;

try { recvSelector = Selector.open(); } catch (IOException e) { System.err.println(e); }

Thread t = new Thread(new Runnable() { @Override public void run() { while(initialized) { readData(); Thread.yield(); } }
}); t.start(); }

public static void shutdown() { initialized = false; }

private static void readData() { try { int numKeys = recvSelector.select(1000);

if (numKeys > 0) { Iterator i = recvSelector.selectedKeys().iterator();

while(i.hasNext())
{
 SelectionKey key = i.next();
 i.remove();

 if (key.isValid() && key.isReadable())
 {
  DatagramChannel channel = (DatagramChannel) key.channel();

  // allocate every time we receive so that it's a copy that won't get erased
  final ByteBuffer buffer = ByteBuffer.allocate(Short.MAX_VALUE);
  channel.receive(buffer);
  buffer.flip();
  final SocketSubscriber subscriber = (SocketSubscriber) key.attachment();

  // let user handle event on a dedicated thread
  eventQueue.execute(new Runnable()
  {
   @Override
   public void run() 
   {
    subscriber.onData(buffer);
   }       
  });
 }
}

} } catch (IOException e) { System.err.println(e); }
}

public UDPSocket(String dstHost, int dstPort) { try { this.dstHost = dstHost; this.dstPort = dstPort; clientChannel = DatagramChannel.open(); clientChannel.configureBlocking(false); } catch (IOException e) { System.err.println(e); } }

public void addListener(SocketSubscriber subscriber) { try { DatagramChannel serverChannel = DatagramChannel.open(); serverChannel.configureBlocking(false); DatagramSocket socket = serverChannel.socket(); socket.bind(new InetSocketAddress(dstPort)); SelectionKey key = serverChannel.register(recvSelector, SelectionKey.OP_READ); key.attach(subscriber); } catch (IOException e) { System.err.println(e); } }

public void send(ByteBuffer buffer) { try { clientChannel.send(buffer, new InetSocketAddress(dstHost, dstPort)); } catch (IOException e) { System.err.println(e); } }

public void close() { try { clientChannel.close(); } catch (IOException e) { System.err.println(e); } } }


import java.nio.ByteBuffer;

public interface SocketSubscriber { public void onData(ByteBuffer data); }

Example usage:


public class Test implements SocketSubscriber
{
 public static void main(String[] args) throws Exception
 {
  UDPSocket.init();
  UDPSocket test = new UDPSocket("localhost", 1234);
  test.addListener(new Test());
  UDPSocket test2 = new UDPSocket("localhost", 4321);
  test2.addListener(new Test());
  System.out.println("Listening...");
  ByteBuffer buffer = ByteBuffer.allocate(500);
  test.send(buffer);
  buffer.rewind();
  test2.send(buffer);
  System.out.println("Data sent...");
  Thread.sleep(5000);
  UDPSocket.shutdown();
 }

@Override public void onData(ByteBuffer data) { System.out.println("Received " + data.limit() + " bytes of data."); } }

like image 951
user208301 Avatar asked Jul 06 '10 18:07

user208301


People also ask

What is the difference between blocking and non blocking IO model?

Blocking - Linear programming, easier to code, less control. Non-blocking - Parallel programming, more difficult to code, more control.

Does Java have non blocking IO?

Java NIO is an asynchronous IO or non-blocking IO. For instance, a thread needs some data from the buffer. While the channel reads data into the buffer, the thread can do something else. Once data is read into the buffer, the thread can then continue processing it.

What is blocking and non blocking IO explain with example?

Blocking IO wait for the data to be write or read before returning. Java IO's various streams are blocking. It means when the thread invoke a write() or read(), then the thread is blocked until there is some data available for read, or the data is fully written. Non blocking I/O. Video Player is loading.

What is non blocking programming in Java?

A Non-Blocking server means that it is able to have multiple requests in progress at the same time by the same process or thread because it uses Non-Blocking I/O. In the Non-Blocking approach – one thread can handle multiple queries at a time.


2 Answers

The Selector has several documented levels of internal synchronization, and you are running into them all. Call wakeup() on the selector before you call register(). Make sure the select() loop works correctly if there are zero selected keys, which is what will happen on wakeup().

like image 103
user207421 Avatar answered Sep 22 '22 06:09

user207421


I ran into the same issue today (that is "wakeupAndRegister" not being available). I hope my solution might be helpful:

Create a sync object:

Object registeringSync = new Object();

Register a channel by doing:

synchronized (registeringSync) {
  selector.wakeup();  // Wakes up a CURRENT or (important) NEXT select
  // !!! Might run into a deadlock "between" these lines if not using the lock !!!
  // To force it, insert Thread.sleep(1000); here
  channel.register(selector, ...);
}

The thread should do the following:

public void run() {    
  while (initialized) {
    if (selector.select() != 0) {  // Blocks until "wakeup"
      // Iterate through selected keys
    }
    synchronized (registeringSync) { }  // Cannot continue until "register" is complete
  }
}
like image 45
Tobias81 Avatar answered Sep 23 '22 06:09

Tobias81