Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Producer consumer with qualifications

Tags:

I am new to clojure and am trying to understand how to properly use its concurrency features, so any critique/suggestions is appreciated. So I am trying to write a small test program in clojure that works as follows:

  1. there 5 producers and 2 consumers
  2. a producer waits for a random time and then pushes a number onto a shared queue.
  3. a consumer should pull a number off the queue as soon as the queue is nonempty and then sleep for a short time to simulate doing work
  4. the consumers should block when the queue is empty
  5. producers should block when the queue has more than 4 items in it to prevent it from growing huge

Here is my plan for each step above:

  1. the producers and consumers will be agents that don't really care for their state (just nil values or something); i just use the agents to send-off a "consumer" or "producer" function to do at some time. Then the shared queue will be (def queue (ref [])). Perhaps this should be an atom though?
  2. in the "producer" agent function, simply (Thread/sleep (rand-int 1000)) and then (dosync (alter queue conj (rand-int 100))) to push onto the queue.
  3. I am thinking to make the consumer agents watch the queue for changes with add-watcher. Not sure about this though..it will wake up the consumers on any change, even if the change came from a consumer pulling something off (possibly making it empty) . Perhaps checking for this in the watcher function is sufficient. Another problem I see is that if all consumers are busy, then what happens when a producer adds something new to the queue? Does the watched event get queued up on some consumer agent or does it disappear?
  4. see above
  5. I really don't know how to do this. I heard that clojure's seque may be useful, but I couldn't find enough doc on how to use it and my initial testing didn't seem to work (sorry don't have the code on me anymore)
like image 589
tgguy Avatar asked May 03 '10 17:05

tgguy


People also ask

What is producer-consumer problem with example?

The producer-consumer problem is an example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer that shares a common fixed-size buffer use it as a queue. The producer's job is to generate data, put it into the buffer, and start again.

What is producer and consumer?

When people make goods and services, goods and services, goods and services—when people make goods and services, they are producers. When they use the things produced, the things produced, the things produced—when they use the things produced, they are consumers.

What are producer-consumer problem also called as?

In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a family of problems described by Edsger W. Dijkstra since 1965.

How can people be both producers and consumers?

Producers create, or produce, goods and provide services, and consumers buy those goods and services with money. Most people are both producers and consumers. Producers create or provide a certain good (product) or service.


1 Answers

Here's my take on it. I made a point of only using Clojure data structures to see how that would work out. Note that it would have been perfectly usual and idiomatic to take a blocking queue from the Java toolbox and use it here; the code would be easy to adapt, I think. Update: I actually did adapt it to java.util.concurrent.LinkedBlockingQueue, see below.

clojure.lang.PersistentQueue

Call (pro-con) to start a test run; then have a look at the contents of output to see if anything happened and queue-lengths to see if they stayed within the given bound.

Update: To explain why I felt the need to use ensure below (I was asked about this on IRC), this is to prevent write skew (see the Wikipedia article on Snapshot isolation for a definition). If I substituted @queue for (ensure queue), it would become possible for two or more producers to check the length of the queue, find that it is less than 4, then place additional items on the queue and possibly bring the total length of the queue above 4, breaking the constraint. Similarly, two consumers doing @queue could accept the same item for processing, then pop two items off the queue. ensure prevents either of these scenarios from happening.

(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn queue-length-watch [_ _ _ new-queue-state]
  (dosync (alter queue-lengths conj (count new-queue-state))))

(add-watch queue :queue-length-watch queue-length-watch)

(defn producer [tag]
  (future
   (while @go-on?
     (if (dosync (let [l (count (ensure queue))]
                   (when (< l *max-queue-length*)
                     (alter queue conj tag)
                     true)))
       (Thread/sleep (rand-int 2000))))))

(defn consumer []
  (future
   (while @go-on?
     (Thread/sleep 100)       ; don't look at the queue too often
     (when-let [item (dosync (let [item (first (ensure queue))]
                               (alter queue pop)
                               item))]
       (Thread/sleep (rand-int 500))         ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))

java.util.concurrent.LinkedBlockingQueue

A version of the above written using LinkedBlockingQueue. Note how the general outline of the code is basically the same, with some details actually being slightly cleaner. I removed queue-lengths from this version, as LBQ takes care of that constraint for us.

(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))

(defn overseer
  ([] (overseer 20000))
  ([timeout]
     (Thread/sleep timeout)
     (swap! go-on? not)))

(defn producer [tag]
  (future
   (while @go-on?
     (.put queue tag)
     (Thread/sleep (rand-int 2000)))))

(defn consumer []
  (future
   (while @go-on?
     ;; I'm using .poll on the next line so as not to block
     ;; indefinitely if we're done; note that this has the
     ;; side effect that nulls = nils on the queue will not
     ;; be handled; there's a number of other ways to go about
     ;; this if this is a problem, see docs on LinkedBlockingQueue
     (when-let [item (.poll queue)]
       (Thread/sleep (rand-int 500)) ; do stuff
       (dosync (alter output conj item)))))) ; and let us know

(defn pro-con []
  (reset! go-on? true)
  (dorun (map #(%1 %2)
              (repeat 5 producer)
              (iterate inc 0)))
  (dorun (repeatedly 2 consumer))
  (overseer))
like image 146
Michał Marczyk Avatar answered Nov 10 '22 09:11

Michał Marczyk