I have 100 workers (agents) that share one ref
that contains collection of tasks. While this collection have tasks, each worker get one task from this collection (in dosync
block), print it and sometimes put it back in the collection (in dosync
block):
(defn have-tasks?
[tasks]
(not (empty? @tasks)))
(defn get-task
[tasks]
(dosync
(let [task (first @tasks)]
(alter tasks rest)
task)))
(defn put-task
[tasks task]
(dosync (alter tasks conj task))
nil)
(defn worker
[& {:keys [tasks]}]
(agent {:tasks tasks}))
(defn worker-loop
[{:keys [tasks] :as state}]
(while (have-tasks? tasks)
(let [task (get-task tasks)]
(println "Task: " task)
(when (< (rand) 0.1)
(put-task tasks task))))
state)
(defn create-workers
[count & options]
(->> (range 0 count)
(map (fn [_] (apply worker options)))
(into [])))
(defn start-workers
[workers]
(doseq [worker workers] (send-off worker worker-loop)))
(def tasks (ref (range 1 10000000)))
(def workers (create-workers 100 :tasks tasks))
(start-workers workers)
(apply await workers)
When i run this code, the last value printed by agents is (after several tries):
435445
,
4556294
,
1322061
,
3950017
.
But never 9999999
what I expect.
And every time the collection is really empty at the end.
What I'm doing wrong?
Edit:
I rewrote worker-loop as simple as possible:
(defn worker-loop
[{:keys [tasks] :as state}]
(loop []
(when-let [task (get-task tasks)]
(println "Task: " task)
(recur)))
state)
But problem is still there. This code behaves as expected when create one and only one worker.
The problem here has nothing to do with agents and barely anything to do with laziness. Here's a somewhat reduced version of the original code that still exhibits the problem:
(defn f [init]
(let [state (ref init)
task (fn []
(loop [last-n nil]
(if-let [n (dosync
(let [n (first @state)]
(alter state rest)
n))]
(recur n)
(locking :out
(println "Last seen:" last-n)))))
workers (->> (range 0 5)
(mapv (fn [_] (Thread. task))))]
(doseq [w workers] (.start w))
(doseq [w workers] (.join w))))
(defn r []
(f (range 1 100000)))
(defn i [] (f (->> (iterate inc 1)
(take 100000))))
(defn t []
(f (->> (range 1 100000)
(take Integer/MAX_VALUE))))
Running this code shows that both i
and t
, both lazy, reliably work, whereas r
reliably doesn't. The problem is in fact a concurrency bug in the class returned by the range
call. Indeed, that bug is documented in this Clojure ticket and is fixed as of Clojure version 1.9.0-alpha11
.
A quick summary of the bug in case the ticket is not accessible for some reason: in the internals of the rest
call on the result of range
, there was a small opportunity for a race condition: the "flag" that says "the next value has already been computed" was set before the actual value itself, which meant that a second thread could see that flag as true even though the "next value" is still nil
. The call to alter
would then fix that nil
value on the ref. It's been fixed by swapping the two assignment lines.
In cases where the result of range
was either forcibly realized in a single thread or wrapped in another lazy seq, that bug would not appear.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With