Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Work queues in Clojure

Tags:

queue

clojure

I’m using a Clojure application to access data from a web API. I’m going to be making a lot of requests, and many of the requests will lead to more requests being made, so I want to keep the request URLs in a queue that will leave 60 seconds between subsequent downloads.

Following this blog post I put this together:

(def queue-delay (* 1000 60)) ; one minute

(defn offer!
  [q x]
  (.offerLast q x)
  q)

(defn take!
  [q]
  (.takeFirst q))

(def my-queue (java.util.concurrent.LinkedBlockingDeque.))

(defn- process-queue-item
  [item]
  (println ">> " item)   ; this would be replaced by downloading `item`
  (Thread/sleep queue-delay))

If I include a (future (process-queue-item (take! my-queue))) in my code somewhere then at the REPL I can (offer! my-queue "something") and I see the ">> something" printed immediately. So far so good! But I need the queue to last for the entire time my program is active. The (future ...) call I just mentioned works to pull one item out of the queue, once it’s available, but I want something that will watch the queue continually and call process-queue-item whenever something is available.

Also, contrary to the usual Clojure love for concurrency, I want to ensure that only one request is being made at a time and that my program waits 60 seconds to make each subsequent request.

I think this Stack Overflow question is relevant, but I’m not sure how to adapt it to do what I want. How do I poll my queue continuously and ensure that only one request is being run at once?

like image 384
bdesham Avatar asked Aug 29 '12 13:08

bdesham


3 Answers

Here's a code snippet from a project I did for fun. It's not perfect, but can give you an idea of how I got around the "wait 55 seconds for the first item" problem. It basically cycles through promises, using futures to process things immediately or until a promise "becomes" available.

(defn ^:private process
  [queues]
  (loop [[q & qs :as q+qs] queues p (atom true)]
    (when-not (Thread/interrupted)
      (if (or
            (< (count (:promises @work-manager)) (:max-workers @work-manager))
            @p) ; blocks until a worker is available
        (if-let [job (dequeue q)]
          (let [f (future-call #(process-job job))]
            (recur queues (request-promise-from-work-manager)))
          (do
            (Thread/sleep 5000)
            (recur (if (nil? qs) queues qs) p)))
        (recur q+qs (request-promise-from-work-manager))))))

Maybe you could do something similar? The code isn't great, and could probably take be re-written to use lazy-seq, but that's just an exercise I haven't gotten to yet!

like image 165
Jeremy Avatar answered Nov 01 '22 21:11

Jeremy


This is quite possibly insane but you could always use a function like this to create a slowed-down lazy sequence:

(defn slow-seq [delay-ms coll]
  "Creates a lazy sequence with delays between each element"
  (lazy-seq 
    (if-let [s (seq coll)]
        (do 
          (Thread/sleep delay-ms)
          (cons (first s)
                (slow-seq delay-ms (rest s)))))))

This will basically ensure a delay between each of the function invocations.

You can use it with something like the following, providing a delay in milliseconds:

(doseq [i (slow-seq 500 (range 10))]
  (println (rand-int 10))

Or alternatively you can put your function call inside the sequence with something like:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10))))

Obviously, in both of the above you can replace (rand-int 10) with whatever code you are using to perform/trigger a download.

like image 40
mikera Avatar answered Nov 01 '22 22:11

mikera


I ended up rolling my own small library, which I called simple-queue. You can read the full documentation on GitHub, but here is the source in its entirety. I’m not going to keep this answer updated, so if you’d like to use this library please get the source from GitHub.

(ns com.github.bdesham.simple-queue)

(defn new-queue
  "Creates a new queue. Each trigger from the timer will cause the function f
  to be invoked with the next item from the queue. The queue begins processing
  immediately, which in practice means that the first item to be added to the
  queue is processed immediately."
  [f & opts]
  (let [options (into {:delaytime 1}
                      (select-keys (apply hash-map opts) [:delaytime])),
        delaytime (:delaytime options),
        queue {:queue (java.util.concurrent.LinkedBlockingDeque.)},
        task (proxy [java.util.TimerTask] []
               (run []
                 (let [item (.takeFirst (:queue queue)),
                       value (:value item),
                       prom (:promise item)]
                   (if prom
                     (deliver prom (f value))
                     (f value))))),
        timer (java.util.Timer.)]
    (.schedule timer task 0 (int (* 1000 delaytime)))
    (assoc queue :timer timer)))

(defn cancel
  "Permanently stops execution of the queue. If a task is already executing
  then it proceeds unharmed."
  [queue]
  (.cancel (:timer queue)))

(defn process
  "Adds an item to the queue, blocking until it has been processed. Returns
  (f item)."
  [queue item]
  (let [prom (promise)]
    (.offerLast (:queue queue)
                {:value item,
                 :promise prom})
    @prom))

(defn add
  "Adds an item to the queue and returns immediately. The value of (f item) is
  discarded, so presumably f has side effects if you're using this."
  [queue item]
  (.offerLast (:queue queue)
              {:value item,
               :promise nil}))

An example of using this queue to return values:

(def url-queue (q/new-queue slurp :delaytime 30))
(def github (q/process url-queue "https://github.com"))
(def google (q/process url-queue "http://www.google.com"))

The calls to q/process will block so that there will be a 30-second delay between the two def statements.

An example of using this queue purely for side effects:

(defn cache-url
  [{url :url, filename :filename}]
  (spit (java.io.File. filename)
        (slurp url)))

(def url-queue (q/new-queue cache-url :delaytime 30))
(q/add url-queue {:url "https://github.com",
                  :filename "github.html"})    ; returns immediately
(q/add url-queue {:url "https://google.com",
                  :filename "google.html"})    ; returns immediately

Now the calls to q/add return immediately.

like image 1
bdesham Avatar answered Nov 01 '22 22:11

bdesham