Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Suspend consumer in producer/consumer pattern

I have producer and consumer connected with BlockingQueue.

Consumer wait records from queue and process it:

Record r = mQueue.take();
process(r);

I need pause this process for a while from other thread. How to implement it?

Now I think implement it such, but it's looks like bad solution:

private Object mLock = new Object();
private boolean mLocked = false;

public void lock() {
    mLocked = true;
}

public void unlock() {
    mLocked = false;
    mLock.notify();

}

public void run() {
    ....
            Record r = mQueue.take();
            if (mLocked) {
                mLock.wait();
            }
            process(r);
}
like image 858
HotIceCream Avatar asked Jul 22 '15 13:07

HotIceCream


2 Answers

I think your solution is simple and elegant, and think you should keep it with some modifications. The modifications I propose are synchronization.

Without it, thread interference and memory consistancy errors can (and very often does) occur. On top of that, you can't wait or notify on a lock you don't own (and you own it if you have it inside a synchronized block..). The fix is easy, just add a mLock synchronize block where you wait/notify on it. Also, as you're changing mLocked from a different thread you will want to mark it volatile.

private Object mLock = new Object();
private volatile boolean mLocked = false;

public void lock() {
    mLocked = true;
}

public void unlock() {
    synchronized(mlock) {
        mLocked = false;
        mLock.notify();
    }

}

public void run() {
    ....
            Record r = mQueue.take();
            synchronized(mLock) {
                while (mLocked) {
                    mLock.wait();
                }
            }
            process(r);
}
like image 119
ddmps Avatar answered Sep 19 '22 17:09

ddmps


You can use java.util.concurrent.locks.Condition Java docs to pause for a while based on same condition.

This is approach looks clean to me and ReentrantLock mechanism has better throughput than synchronized. Read below excerpt from IBM article

As a bonus, the implementation of ReentrantLock is far more scalable under contention than the current implementation of synchronized. (It is likely that there will be improvements to the contended performance of synchronized in a future version of the JVM.) This means that when many threads are all contending for the same lock, the total throughput is generally going to be better with ReentrantLock than with synchronized.


BlockingQueue are best known for solution of producer-consumer problem, and it also uses Condition for waiting.

See below example taken from Java doc's Condition, which is a sample implementation of producer - consumer pattern.

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

Further reading:

  • https://javabeanz.wordpress.com/2007/07/12/using-reentrant-locks-for-thread-synchronization/
like image 38
hagrawal Avatar answered Sep 20 '22 17:09

hagrawal