Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producer/consumer multithreading

Background

Lacking money for school, I am working night shifts at a tollbooth and using the Internet to teach myself some coding skills, hoping for a better job tomorrow or the online sale of some app I make. Long nights, few customers.

I am tackling multithreading as a topic as I encounter a lot of code in literature (e.g. Android SDK) which uses it, but I still find it obscure.

Spirit

My approach at this point is: try to code the most basic multithreading example I can think of, bang my head against the wall a little, and see if I can stretch my brain into accomodating some novel way of thinking. I am exposing myself to my limits to hopefully surpass them. Feel free to criticise wildly, to the point of nitpicking, and point out better ways of doing what I am trying to do.

Objective

  • Get some advice on how to do the above, based on my efforts so far (code provided)

The exercise

Here's the scope I define:

Definition

Create two classes which work in tandem on the production of data objects and consumption thereof. One Thread creates objects and delivers them to a shared space for the other to pick up and consume. Let's call the producing thread Producer, the consuming thread Consumer and the shared space SharedSpace. The act of producting objects for consumption by the other could be assimilated by means of analogy to this scenario:

`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

To simplify the exercise, I decide not to allow the mum to be cooking as the child eats his cake. She will just wait for the child to finish his cake and instantaneously make another one, up to a certain limit, for good parenting. The essence of the exercise is to practise the signalling of the Threads over achieving any concurrency at all. On the contrary, I am focussing on perfect serialisation, with no polling or "can I go yet?" checks. I suppose I will have to code the follow-on exercise in which mother and child "work" in parallel next.

Approach

  • Have my classes implement the Runnable interface so they have a code entry point of their own

  • Use my classes as constructor arguments to Thread objects which are instantiated and started from the program's main entry point

  • Ensure the main program does not terminate before the Threads do by means of Thread.join()

  • Set a limit to the number of times the Producer will create data for the Consumer

  • Agree on a sentinel value the Produce will use to signal end of data production

  • Log acquisition of locks on the shared resource and data production/consumption events, including final signing off of worker threads

  • Create a single SharedSpace object from the program's main and pass it to each worker before start

  • Store a private reference to the SharedSpace object internally to each worker

  • Provide guard against and messages to describe the condition of a Consumer being ready to consume before any data has been produced

  • Stop the Producer after a given number of iterations

  • Stop the Consumer after it reads the sentinel value

Code


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          sharedSpace.notify();
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        nIterations++;
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        }
        logger.info("Waking up consumer for data consumption.");
        sharedSpace.notify();
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class SharedSpace {
  volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  }
  @Override
  public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
  }
}
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
      producer.join();
      consumer.join();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}

Execution log


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.

Question

  • Is the above correct? (e.g. does it use the correct language tools, the right approach, does it contain any stupid code, ...)

But it "looks right"?

I ask about correctness even if the output "looks good" because you can't imagine how many times things went wrong in my testing "one time" and not "the other" (e.g. when the Consumer started first, when the Producer never quit after producing the sentinel, etc.). I have learnt not to claim correctness from "a successful run". On the contrary, I have become very suspicious of pseudo-parallel code! (this one is not even parallel by definition!0

Extended answers

A good question focuses on just one requested piece of advice (the one above), but feel free to mention any insights into the following other topics in your answer if you like:

  • How could I test parallel code as I code my next attempts?

  • Which tools can help me in both development and debugging? Consider I use Eclipse

  • Would the approach change if I allowed the Producer to continue producing, with each production taking some variable amount of time, whilst the Consumer consumes anything that becomes available? Would locking have to be moved elsewhere? Would signalling need to change from this wait/notify paradigm?

  • Is this method of doing things obsolete and should I rather be learning something else? From this tollbooth, I have no idea of what happens "in the real world of Java"

Next steps

  • Where should I go from here? I have seen the notion of "futures" mentioned somewhere but I could use a numbered list of topics to work through in sequence, pedagocially ordered, with links to associated learning resources

Tino Sino

like image 863
Robottinosino Avatar asked Sep 23 '12 17:09

Robottinosino


People also ask

What is producer-consumer problem in multithreading?

In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, which share a common, fixed-size buffer used as a queue.

What are Producer and Consumer threads?

The producer and consumer problem is one of the small collection of standard, well-known problems in concurrent programming. A finite-size buffer and two classes of threads, producers and consumers, put items into the buffer (producers) and take items out of the buffer (consumers).

What's an example of a Producer and Consumer relationship?

One example of a common producer/consumer relationship is print spooling. Although a printer might not be available when you want to print from an application (i.e., the producer), you can still “complete” the print task, as the data is temporarily placed on disk until the printer becomes available.

What is Producer-consumer algorithm?

Producer-Consumer problem is a classical synchronization problem in the operating system. With the presence of more than one process and limited resources in the system the synchronization problem arises. If one resource is shared between more than one process at the same time then it can lead to data inconsistency.


2 Answers

Is the above correct?

The only problem I see is what has been mentioned by @Tudor and @Bhaskar. Whenever you are testing for a condition when you are waiting for it, you must use a while loop. However, this is more about race-conditions with multiple producers and consumers. Spurious wakeups can happen but the race-conditions are much more likely. See my page on the subject.

Yes, you only have 1 producer and 1 consumer but you may try to extend your code for multiple consumers or copy your code to another scenario.

I have learned not to claim correctness from "a successful run". On the contrary, I have become very suspicious of pseudo-parallel code!

Good instinct.

How could I test parallel code as I code my next attempts?

This is very hard. Scaling it up is one way. Add multiple producers and consumers and see if there are problems. Running on multiple architectures with different numbers/types of processors. Your best defense will be code correctness. Tight synchronization, good use of BlockingQueue, ExecutorService, etc. classes to make your close simpler/cleaner.

No easy answer. Testing multithreaded code is extremely hard.

Which tools can help me in both development and debugging?

In terms of general stuff, I'd look into a coverage tool like Emma so you can make sure your unit tests are covering all of your code.

In terms of multithreading code testing, get to know how to read kill -QUIT thread-dumps and look at running threads inside of Jconsole. Java profilers like YourKit may also help.

Would the approach change if I allowed the Producer to continue producing, with each production taking some variable amount of time...

I don't think so. The consumer will wait on the producer forever. Maybe I'm not understanding this question?

Is this method of doing things obsolete and should I rather be learning something else? From this tollbooth, I have no idea of what happens "in the real world of Java"

Learning about the ExecutorService classes is next. These handle a large percentage of the new Thread() style code -- especially when you are dealing with a number of asynchronous tasks being executed with threads. Here's a tutorial.

Where should I go from here?

Again, ExecutorService. I assume you've read this starting docs. As @Bhaskar mentioned, Java Concurrency in Practice is a good bible.


Here are some general comments about your code:

  • The SharedSpace and Threaded classes seems like a contrived way to do this. If you are playing around with base classes and the like then fine. But in general, I never use a pattern like this. A producer and consumer are usually working with a BlockingQueue like LinkedBlockingQueue in which case the synchronization and volatile payloads are taken care of for you. Also, I tend to inject shared information into an object constructor as opposed to getting it from a base class.

  • Typically if I am using synchronized it is on a private final field. Often I create a private final Object lockObject = new Object(); for locking unless I am working with an object already.

  • Be careful of huge synchronized blocks and putting log messages inside of synchronized sections. Logs usually do synchronized IO to the file-system which can be very expensive. You should have small, very tight, synchronized blocks if possible.

  • You define consumedData outside of the loop. I would define it at the point of the assignment and then use a break to bail from the loop if it is == -1. Make sure to limit your local variables scope if at all possible.

  • Your logging messages are going to dominate your code performance. This means that when you remove them, your code is going to perform completely differently. This is very important to realize when you go to debug problems with it. The performance will also (most likely) change when you move to a different architecture with different CPUs/cores.

  • You probably know this but when you call sharedSpace.notify();, that only means that another thread is notified if it is currently in sharedSpace.wait();. If it is doesn't something else then it will miss the notification. Just FYI.

  • It's a little strange to do a if (nIterations <= N_ITERATIONS), and then 3 lines below the else do it again. Duplicating the notify() would be better to simplify the branching.

  • You have int nIterations = 0; then a while then inside a ++. That's a recipe for a for loop:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    

Here's a much tighter version of your code. This is just an example of how I would write it. Again, aside from the missing while there seems to be nothing wrong with your version.

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
              break;
          }
          logger.info("Consumed: {}.", consumedData);
       }
       logger.info("Signing off.");
    }
}

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
          queue.put(nIterations);
       }
       queue.put(FINAL_VALUE);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
    }
}
like image 182
Gray Avatar answered Oct 12 '22 04:10

Gray


You seem to have done a pretty good job here. Not much to nitpick actually. One think I would like to recommend is you should avoid synchronizing on the buffer object itself. In this case it's ok, but assuming you switch to a data structure buffer instead, depending on the class it might be synchronized internally (e.g. Vector, although it's obsolete by now), so acquiring a lock from the outside might mess it up.

Edit: Bhaskar makes a good point about using a while to wrap calls to wait. This is because of the infamous spurious wake-ups that can occur, forcing the thread to come out of wait prematurely, so you need to make sure it goes back in.

What you could do next is to implement a finite-buffer producer consumer: have some shared data structure e.g. a linked list and set a maximum size (e.g. 10 items). Then let the producer keep producing and only suspend it whenever there are 10 items in the queue. The consumer will be suspended whenever the buffer is empty.

The next steps you could take are learning how to automate the process you have implemented manually. Take a look at BlockingQueue that provides a buffer with blocking behavior (i.e. the consumer will automatically block if the buffer is empty and the producer will block if it's full).

Also, depending on the situation, executors (look at ExecutorService) can be a worthy replacement, since they encapsulate a task queue and one or more workers (consumers) so all you need is the producer.

like image 36
Tudor Avatar answered Oct 12 '22 04:10

Tudor