Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clojure agents consuming from a queue

I'm trying to figure out the best way to use agents to consume items from a Message Queue (Amazon SQS). Right now I have a function (process-queue-item) that grabs an items from the queue, and processes it.

I want to process these items concurrently, but I can't wrap my head around how to control the agents. Basically I want to keep all of the agents busy as much as possible without pulling to many items from the Queue and developing a backlog (I'll have this running on a couple of machines, so items need to be left in the queue until they are really needed).

Can anyone give me some pointers on improving my implementation?

(def active-agents (ref 0))  (defn process-queue-item [_]   (dosync (alter active-agents inc))   ;retrieve item from Message Queue (Amazon SQS) and process   (dosync (alter active-agents dec)))  (defn -main []   (def agents (for [x (range 20)] (agent x)))    (loop [loop-count 0]      (if (< @active-agents 20)       (doseq [agent agents]         (if (agent-errors agent)           (clear-agent-errors agent))         ;should skip this agent until later if it is still busy processing (not sure how)         (send-off agent process-queue-item)))      ;(apply await-for (* 10 1000) agents)     (Thread/sleep  10000)     (logging/info (str "ACTIVE AGENTS " @active-agents))     (if (> 10 loop-count)       (do (logging/info (str "done, let's cleanup " count))        (doseq [agent agents]          (if (agent-errors agent)            (clear-agent-errors agent)))        (apply await agents)        (shutdown-agents))       (recur (inc count))))) 
like image 299
erikcw Avatar asked Apr 08 '10 19:04

erikcw


Video Answer


1 Answers

(let [switch (atom true) ; a switch to stop workers       workers (doall                  (repeatedly 20 ; 20 workers pulling and processing items from SQS                   #(future (while @switch                               (retrieve item from Amazon SQS and process)))))]   (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-)   (reset! switch false) ; stop !   (doseq [worker workers] @worker)) ; waiting for all workers to be done 
like image 186
cgrand Avatar answered Oct 05 '22 01:10

cgrand