Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous job queue for web service in Clojure

At the moment I'm trying to construct a web service with a RESTful API that handles some long running tasks (jobs).

The idea is that a user submits a job by doing a POST which returns some URL for checking the job status which also contains a url for the results. Once the job is complete (i.e. some value was written to a database) the results URL will return the appropriate information (instead of no results) and the job url will indicate a completed status.

Unfortunately the calculations are quite intensive so only one can be run at a time, therefore the jobs need to be queued.

In pseudo something like this would be needed

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {}))

(defn schedule-job [params] 
  ;; schedules the job into the queue and 
  ;; adds the job to a jobs map for checking status via GET
  ;; note that the job should not  be evaluated until popped from the queue
)

(POST "/analyze" [{params :params}] 
 (schedulde-job params))

(GET "job/:id" [:d] 
 (get @jobs id))

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 

I've looked into Lamina which allows asynchronous processing but it didn't seem to suit my needs.

My question is how to dequeue the jobs-queue and execute its task after the previous one has finished, without terminating when the queue is empty i.e. perpetually processing the incoming jobs.

like image 712
JoelKuiper Avatar asked Feb 03 '13 13:02

JoelKuiper


1 Answers

A java.util.concurrent.ExecutorService may be what you want. This allows you to submit a job for later execution, and returns a Future that you can query to discover if it has completed.

(import '[java.util.concurrent Callable Executors])

(def job-executor
  (Executors/newSingleThreadExecutor))

(def jobs (atom {}))

(defn submit-job [func]
  (let [job-id   (str (java.util.UUID/randomUUID))
        callable (reify Callable (call [_] (func))]
    (swap! jobs assoc job-id (.submit job-executor callable))
    job-id))

(use 'compojure.core)

(defroutes app
  (POST "/jobs" [& params]
    (let [id (submit-job #(analyze params))]
      {:status 201 :headers {"Location" (str "/jobs/" id)}}))
  (GET "/jobs/:id" [id]
    (let [job-future (@jobs id)]
      (if (.isDone job-future)
        (.get job-future)
        {:status 404}))))
like image 80
weavejester Avatar answered Sep 23 '22 00:09

weavejester