Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Piping data between threads with Java

I am writing a multi-threaded application that mimics a movie theater. Each person involved is its own thread and concurrency must be done completely by semaphores. The only issue I am having is how to basically link threads so that they can communicate (via a pipe for instance).

For instance:

Customer[1] which is a thread, acquires a semaphore that lets it walk up to the Box Office. Now Customer[1] must tell the Box Office Agent that they want to see movie "X". Then BoxOfficeAgent[1] also a thread, must check to make sure the movie isn't full and either sell a ticket or tell Customer[1] to pick another movie.

How do I pass that data back and forth while still maintaining concurrency with the semaphores?

Also, the only class I can use from java.util.concurrent is the Semaphore class.

like image 781
JustinY17 Avatar asked Apr 09 '11 04:04

JustinY17


1 Answers

One easy way to pass data back and forth between threads is to use the implementations of the interface BlockingQueue<E>, located in the package java.util.concurrent.

This interfaces has methods to add elements to the collection with different behaviors:

  • add(E): adds if possible, otherwise throws exception
  • boolean offer(E): returns true if the element has been added, false otherwise
  • boolean offer(E, long, TimeUnit): tries to add the element, waiting the specified amount of time
  • put(E): blocks the calling thread until the element has been added

It also defines methods for element retrieval with similar behaviors:

  • take(): blocks until there's an element available
  • poll(long, TimeUnit): retrieves an element or returns null

The implementations I use most frequently are: ArrayBlockingQueue, LinkedBlockingQueue and SynchronousQueue.

The first one, ArrayBlockingQueue, has a fixed size, defined by a parameter passed to its constructor.

The second, LinkedBlockingQueue, has illimited size. It will always accept any elements, that is, offer will return true immediately, add will never throw an exception.

The third, and to me the most interesting one, SynchronousQueue, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.

To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

Notice that this class presents similar behavior to what I described about the SynchronousQueue.

Once the methods put(E) gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling the take() method to proceed.

The first step of the take() method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the method put(E) may be called again by any thread, and returns what has been saved in the local variable.

As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods take() and put(E) are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior of take() and put(E) breaking the contract.

Finally, you could avoid the need to declare a local variable in the take() method by using try {} finally {} as follows:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

Here, the point of this example if just to show an use of try/finally that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.

Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an offer(E) method and a take(E, long, TimeUnit)!

Good luck.

like image 73
Bruno Reis Avatar answered Oct 01 '22 19:10

Bruno Reis