Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producer-consumer inter-thread communication

having trouble with inter-thread communication and "solved" it by using "dummy messages" all over the place. Is this a bad idea? What are possible solutions?

Example Problem i have.

main thread starts a thread for processing and inserting records into database. main thread reads a possibly huge file and puts one record (object) after another into a blockingqueue. processing thread reads from queue and does work.

How do I tell "processing thread" to stop? Queue can be empty but work is not done and the main thread does not now either when processing thread has finished work and can't interrupt it.

So processing thread does

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) {
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS);
    if (object != null) {
        String data = object.getData();
        if (data.equals("END")) {
            break;
        }
    // do work
    }
}
// clean-up
synchronized queue) {
    queue.notifyAll();
}
return;

and main thread

// ...start processing thread...
while(reader.hasNext(){
    // ...read whole file and put data in queue...
}
MyObject dummy = new MyObject();
dummy.setData("END");
queue.put(dummy);
//Note: empty queue here means work is done
while (queue.size() > 0) {
    synchronized (queue) {
        queue.wait(500); // over-cautios locking prevention i guess
    }
}

Note that insertion must be in same transaction and transaction can't be handled by main thread.

What would be a better way of doing this? (I'm learning and don't want to start "doing it the wrong way")

like image 646
beginner_ Avatar asked Dec 21 '11 19:12

beginner_


2 Answers

These dummy message is valid. It is called "poison". Something that the producer sends to the consumer to make it stop.

Other possibility is to call Thread.interrupt() somewhere in the main thread and catch and handle the InterruptedException accordingly, in the worker thread.

like image 199
Victor Stafusa Avatar answered Sep 28 '22 07:09

Victor Stafusa


"solved" it by using "dummy messages" all over the place. Is this a bad idea? What are possible solutions?

It's not a bad idea, it's called "Poison Pills" and is a reasonable way to stop a thread-based service.

But it only works when the number of producers and consumers is known.

In code you posted, there are two threads, one is "main thread", which produces data, the other is "processing thread", which consumes data, the "Poison Pills" works well for this circumstance.

But to imagine, if you also have other producers, how does consumer know when to stop (only when all producers send "Poison Pills"), you need to know exactly the number of all the producers, and to check the number of "Poison Pills" in consumer, if it equals to the number of producers, which means all producers stopped working, then consumer stops.

In "main thread", you need to catch the InterruptedException, since if not, "main thread" might not able to set the "Poison Pill". You can do it like below,

...
try {
    // do normal processing
} catch (InterruptedException e) { /*  fall through  */  }
finally {
    MyObject dummy = new MyObject();
    dummy.setData("END");
    ...
}
...

Also, you can try to use the ExecutorService to solve all your problem.

(It works when you just need to do some works and then stop when all are finished)

void doWorks(Set<String> works, long timeout, TimeUnit unit)
    throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    try {
        for (final String work : works)
            exec.execute(new Runnable() {
                    public void run() {
                        ...
                    }
                });
    } finally {
        exec.shutdown();
        exec.awaitTermination(timeout, unit);
    }
}

I'm learning and don't want to start "doing it the wrong way"

You might need to read the Book: Java Concurrency in Practice. Trust me, it's the best.

like image 43
Chris Zheng Avatar answered Sep 28 '22 07:09

Chris Zheng