I am new to clojure and am trying to understand how to properly use its concurrency features, so any critique/suggestions is appreciated. So I am trying to write a small test program in clojure that works as follows:
Here is my plan for each step above:
The producer-consumer problem is an example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer that shares a common fixed-size buffer use it as a queue. The producer's job is to generate data, put it into the buffer, and start again.
When people make goods and services, goods and services, goods and services—when people make goods and services, they are producers. When they use the things produced, the things produced, the things produced—when they use the things produced, they are consumers.
In computing, the producer-consumer problem (also known as the bounded-buffer problem) is a family of problems described by Edsger W. Dijkstra since 1965.
Producers create, or produce, goods and provide services, and consumers buy those goods and services with money. Most people are both producers and consumers. Producers create or provide a certain good (product) or service.
Here's my take on it. I made a point of only using Clojure data structures to see how that would work out. Note that it would have been perfectly usual and idiomatic to take a blocking queue from the Java toolbox and use it here; the code would be easy to adapt, I think. Update: I actually did adapt it to java.util.concurrent.LinkedBlockingQueue
, see below.
Call (pro-con)
to start a test run; then have a look at the contents of output
to see if anything happened and queue-lengths
to see if they stayed within the given bound.
Update: To explain why I felt the need to use ensure
below (I was asked about this on IRC), this is to prevent write skew (see the Wikipedia article on Snapshot isolation for a definition). If I substituted @queue
for (ensure queue)
, it would become possible for two or more producers to check the length of the queue, find that it is less than 4, then place additional items on the queue and possibly bring the total length of the queue above 4, breaking the constraint. Similarly, two consumers doing @queue
could accept the same item for processing, then pop two items off the queue. ensure
prevents either of these scenarios from happening.
(def go-on? (atom true))
(def queue (ref clojure.lang.PersistentQueue/EMPTY))
(def output (ref ()))
(def queue-lengths (ref ()))
(def *max-queue-length* 4)
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn queue-length-watch [_ _ _ new-queue-state]
(dosync (alter queue-lengths conj (count new-queue-state))))
(add-watch queue :queue-length-watch queue-length-watch)
(defn producer [tag]
(future
(while @go-on?
(if (dosync (let [l (count (ensure queue))]
(when (< l *max-queue-length*)
(alter queue conj tag)
true)))
(Thread/sleep (rand-int 2000))))))
(defn consumer []
(future
(while @go-on?
(Thread/sleep 100) ; don't look at the queue too often
(when-let [item (dosync (let [item (first (ensure queue))]
(alter queue pop)
item))]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
A version of the above written using LinkedBlockingQueue
. Note how the general outline of the code is basically the same, with some details actually being slightly cleaner. I removed queue-lengths
from this version, as LBQ
takes care of that constraint for us.
(def go-on? (atom true))
(def *max-queue-length* 4)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length*))
(def output (ref ()))
(defn overseer
([] (overseer 20000))
([timeout]
(Thread/sleep timeout)
(swap! go-on? not)))
(defn producer [tag]
(future
(while @go-on?
(.put queue tag)
(Thread/sleep (rand-int 2000)))))
(defn consumer []
(future
(while @go-on?
;; I'm using .poll on the next line so as not to block
;; indefinitely if we're done; note that this has the
;; side effect that nulls = nils on the queue will not
;; be handled; there's a number of other ways to go about
;; this if this is a problem, see docs on LinkedBlockingQueue
(when-let [item (.poll queue)]
(Thread/sleep (rand-int 500)) ; do stuff
(dosync (alter output conj item)))))) ; and let us know
(defn pro-con []
(reset! go-on? true)
(dorun (map #(%1 %2)
(repeat 5 producer)
(iterate inc 0)))
(dorun (repeatedly 2 consumer))
(overseer))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With