Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clojure message handling / async, multithreaded

I have a small Clojure consumer/publisher receiving messages, processing them and sending them off to other consumers, all via RabbitMQ.

I've defined a message-handler that handles messages in a separate thread (separate from the main thread that is). As can be seen in the code below, the thread synchronously receives and sends messages, all happening in an event loop started by the lcm/subscribe function.

So, the question is, what would be the "Clojure way" to create a N-sized thread pool of these synchronous message-handlers? I guess the non-Clojure way would be to manually spawn a number of threads via Java interop.

Also, would that speed up message processing things at all, considering that the processing isn't very CPU intensive? Would it be better to make these message-handlers async - again, considering that more time is spent publishing than processing?

And finally, how would I go about measuring the performance of these contending approaches (I come from the Ruby/Javascript world, and there isn't any multithreading there)?

NOTE: I know all this could be avoided by just scaling horizontally and spawning more JVM processes listening to the message bus, but since the app is going to be deployed on Heroku, I'd like to use as much resources as possible in each dyno/process.

(defn message-handler
  [ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message)))

(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name message-handler :auto-ack true))))))

On a more basic note... how would I go about refactoring this code to support registering the message-handler callback with an additional argument, like this:

    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))

and then publish with a reference:

    (lb/publish ch pub-name "" processed-message)))

instead of a literal:

    (lb/publish ch "e.events" "" processed-message)))
like image 840
neektza Avatar asked Oct 23 '22 07:10

neektza


1 Answers

For the second part of the question, you can use partial application as shown below:

(defn message-handler
  [pub-name ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message)))



(.start 
  (Thread. 
     (fn []
       (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
like image 192
Ankur Avatar answered Oct 27 '22 09:10

Ankur