Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clojure core.async for data computation

I've started using the clojure core.async library. I found the concepts of CSP, channels, go blocks really easy to use. However, I'm not sure if I'm using them right. I've got the following code -

(def x-ch (chan))
(def y-ch (chan))
(def w1-ch (chan))
(def w2-ch (chan))

; they all return matrices
(go (>! x-ch (Mat/* x (map #(/ 1.0 %) (max-fold x)))))
(go (>! y-ch (Mat/* y (map #(/ 1.0 %) (max-fold y)))))
(go (>! w1-ch (gen-matrix 200 300)))
(go (>! w2-ch (gen-matrix 300 100)))

(let [x1 (<!! (go (<! x-ch)))
        y1 (<!! (go (<! y-ch)))
        w1 (<!! (go (<! w1-ch)))
        w2 (<!! (go (<! w2-ch)))]

    ;; do stuff w/ x1 y1 w1 w2
)

I've got predefined (matrix) vectors in symbols x and y. I need to modify both vectors before I use them. Those vectors are pretty large. I also need to generate two random matrices. Since go macro starts the computation asyncronously, I split all four computation tasks into separate go blocks and put the consequent result into channels. Then I've got a let block where I take values from the channels and store them into symbols. They are all using blocking <!! take functions since they're on the main thread.

What I'm trying to do basically is speed up my computation time by splitting program fragments into async processes. Is this the right way to do it?

like image 506
Lordking Avatar asked Oct 19 '15 00:10

Lordking


2 Answers

For this kind of processing, future may be slightly more adequate.

The example from the link is simple to grasp:

 (def f 
   (future 
     (Thread/sleep 10000) 
     (println "done") 
     100))

The processing, the future block is started immediately, so the above does start a thread, wait for 10s and prints "done" when finished.

When you need the value you can just use:

(deref f)
; or @f

Which will block and return the value of the code block of the future.

In the same example, if you call deref before the 10 seconds have gone, the call will block until the computation is finished.

In your example, since you are just waiting for computations to finish, and are not so much concern about messages and interactions between the channel participants future is what I would recommend. So:

 (future 
    (Mat/* x (map #(/ 1.0 %) (max-fold x))))
like image 94
Nicolas Modrzyk Avatar answered Nov 04 '22 11:11

Nicolas Modrzyk


If you're looking to speed up your program more generally by running code in parallel, then you could look at using Clojure's Reducers, or Aphyr's Tesser. These work by splitting up the work on a single computation into parallelisable parts, then combining them together. These will efficiently run the work over as many cores as your computer has. If you run each of your computations with a future or in a go block, then each computation will run on a single thread, some may finish before others and those cores will be idle.

like image 36
Daniel Compton Avatar answered Nov 04 '22 11:11

Daniel Compton