Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Do the BlockingQueue methods always throw an InterruptedException when the thread is interrupted?

In one of my Java 6 applications I have a thread that feeds the main thread with data, while also prefetching more records from a DB. It uses an ArrayBlockingQueue queue as a FIFO buffer and its main loop is something along these lines:

while (!Thread.interrupted()) {
    if (source.hasNext()) {
        try {
            queue.put(source.next())
        } catch (InterruptedException e) {
            break;
        }
    } else {
        break;
    }
}

There is code that does a bit of clean-up after the loop terminates, such as poisoning the queue and releasing any resources, but this is pretty much all about it.

As it stands, there is no direct communication from the main thread to the feeder thread: the feeder thread is set-up with the proper options and then left on its own, using the blocking queue to control the data flow.

The problem appears when the main thread needs to shutdown the feeder when the queue is full. Since there is no direct control channel, the shutdown method uses the Thread interface to interrupt() the feeder thread. Unfortunately, in most cases the feeder thread remains blocked in put(), despite being interrupted - no exception is thrown.

From a brief perusal of the interrupt() documentation and the queue implementation source code, it seems to me that quite often put() blocks without using any of the interruptible facilities of the JVM. More specifically, on my current JVM (OpenJDK 1.6b22), it blocks on the sun.misc.Unsafe.park() native method. Perhaps it uses a spinlock or something else, but in any case, this seems to fall under the following case:

If none of the previous conditions hold then this thread's interrupt status will be set.

A status flag is set, but the thread is still blocked in put() and does not iterate further so that the flag can be checked. The result? A zombie thread that just won't die!

  1. Is my understanding on this issue correct, or am I missing something?

  2. What are the possible approaches to fix this issue? Right now I can only think of two solutions:

    a. Calling poll() a bunch of times on the queue to unblock the feeder thread: Ugly and not very reliable from what I've seen, but it mostly works.

    b. Use the offer() method with a timeout instead of put() to allow the thread to check its interrupt status within an acceptable time frame.

Unless I am missing something, this is a somewhat underdocumented caveat of the BlockingQueue implementations in Java. There seem to be some indications of it when the documentation e.g. suggests poisoning the queues to shutdown a worker thread, but I cannot find any explicit reference.

EDIT:

OK, there is a more, uh, drastic variation of solution (a) above: ArrayBlockingQueue.clear(). I think this should always work, even if it's not exactly the definition of elegance...

like image 806
thkala Avatar asked Feb 13 '12 00:02

thkala


People also ask

What happens when a thread is interrupted?

The "interrupted" status of the thread is set to true. If the thread is currently blocked by a call to sleep or wait, an InterruptedException is thrown. tests whether or not the current thread (that is, the thread that is executing this instruction) has been interrupted.

What exception is thrown when a thread is interrupted?

3. What Is an InterruptedException? An InterruptedException is thrown when a thread is interrupted while it's waiting, sleeping, or otherwise occupied. In other words, some code has called the interrupt() method on our thread.

Which methods in thread class throws an InterruptedException?

When the thread is executing the sleep() , wait() , and join() methods, those methods will throw an InterruptedException. Otherwise, a flag is set that the thread can examine to determine that the interrupt() method has been called.

What happens if the current thread has interrupted by another thread in ThreadInterruptedException ()?

If the thread is not blocking, the exception will not be thrown. For . NET languages, a ThreadInterruptedException will be thrown if the thread is currently blocking. If the thread isn't blocking the exception will not be thrown until the thread blocks.


2 Answers

I think there are two possible causes to your issue.

  1. As described in The Law of the Sabotaged Doorbell you may not be handling the interrupt correctly. There you will find:

    What should we do when we call code that may cause an InterruptedException? Don't immediately yank out the batteries! Typically there are two answers to that question:

    Rethrow the InterruptedException from your method. This is usually the easiest and best approach. It is used by the new java.util.concurrent.* package, which explains why we are now constantly coming into contact with this exception.
    Catch it, set interrupted status, return. If you are running in a loop that calls code which may cause the exception, you should set the status back to being interrupted.

    For example:

    while (!Thread.currentThread().isInterrupted()) {
        // do something
        try {
            TimeUnit.SECONDS.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    
  2. Either source.hasNext() or source.next() are consuming and discarding the interrupt status. See Added below for how I solved this problem.

I am confident that interrupting a thread at ArrayBlockingqueue.put() is an effective solution.

Added

I solved problem 2 using a CloseableBlockingQueue which can be closed from the reader end. In this way, once it is closed, all put calls will shortcut. You can then check the closed flag of the queue from the writer.

// A blocking queue I can close from the pull end. 
// Please only use put because offer does not shortcut on close.
// <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin.">
class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
  // Flag indicates closed state.
  private volatile boolean closed = false;
  // All blocked threads. Actually this is all threads that are in the process
  // of invoking a put but if put doesn't block then they disappear pretty fast.
  // NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put.
  private final Container<Thread> blocked;

  // Limited size.
  public CloseableBlockingQueue(int queueLength) {
    super(queueLength);
    blocked = new Container<Thread>(queueLength);
  }

  /**
   * *
   * Shortcut to do nothing if closed.
   *
   * Track blocked threads.
   */
  @Override
  public void put(E e) throws InterruptedException {
    if (!closed) {
      Thread t = Thread.currentThread();
      // Hold my node on the stack so removal can be trivial.
      Container.Node<Thread> n = blocked.add(t);
      try {
        super.put(e);
      } finally {
        // Not blocked anymore.
        blocked.remove(n, t);
      }
    }
  }

  /**
   *
   * Shortcut to do nothing if closed.
   */
  @Override
  public E poll() {
    E it = null;
    // Do nothing when closed.
    if (!closed) {
      it = super.poll();
    }
    return it;
  }

  /**
   *
   * Shortcut to do nothing if closed.
   */
  @Override
  public E poll(long l, TimeUnit tu) throws InterruptedException {
    E it = null;
    // Do nothing when closed.
    if (!closed) {
      it = super.poll(l, tu);
    }
    return it;
  }

  /**
   *
   * isClosed
   */
  boolean isClosed() {
    return closed;
  }

  /**
   *
   * Close down everything.
   */
  void close() {
    // Stop all new queue entries.
    closed = true;
    // Must unblock all blocked threads.

    // Walk all blocked threads and interrupt them.
    for (Thread t : blocked) {
      //log("! Interrupting " + t.toString());
      // Interrupt all of them.
      t.interrupt();
    }
  }

  @Override
  public String toString() {
    return blocked.toString();
  }
}

You will also need the Container which is lock-free and O(1) put/get (although it is not strictly a collection). It uses a Ring behind the scenes.

public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      if (next == null) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}
like image 135
OldCurmudgeon Avatar answered Oct 17 '22 03:10

OldCurmudgeon


private AtomicBoolean shutdown = new AtomicBoolean();

void shutdown()
{
   shutdown.set(true);
}

while (!shutdown.get()) {
    if (source.hasNext()) {
       Object item = source.next();
       while (!shutdown.get() && !queue.offer(item, 100, TimeUnit.MILLISECONDS)) {
          continue;
       }
    }
    else {
       break;
    }
}
like image 41
brettw Avatar answered Oct 17 '22 02:10

brettw