Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a Clojure idiom for dispatching multiple expressions in parallel

I have a number of (unevaluated) expressions held in a vector; [ expr1 expr2 expr3 ... ]

What I wish to do is hand each expression to a separate thread and wait until one returns a value. At that point I'm not interested in the results from the other threads and would like to cancel them to save CPU resource.

( I realise that this could cause non-determinism in that different runs of the program might cause different expressions to be evaluated first. I have this in hand. )

Is there a standard / idiomatic way of achieving the above?

like image 940
Sean Holdsworth Avatar asked Aug 19 '13 13:08

Sean Holdsworth


3 Answers

Here's my take on it.

Basically you have to resolve a global promise inside each of your futures, then return a vector containing future list and the resolved value and then cancel all the futures in the list:

(defn run-and-cancel [& expr]
    (let [p (promise)
          run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p])
          [fs res] (apply run-futures expr)]
        (map future-cancel fs)
        res))
like image 114
soulcheck Avatar answered Oct 09 '22 05:10

soulcheck


It's not reached an official release yet, but core.async looks like it might be an interesting way of solving your problem - and other asynchronous problems, very neatly.

The leiningen incantation for core.async is (currently) as follows:

[org.clojure/core.async "0.1.0-SNAPSHOT"]

And here's some code to make a function that will take a number of time-consuming functions, and block until one of them returns.

(require '[clojure.core.async :refer [>!! chan alts!! thread]]))     

(defn return-first [& ops]
  (let [v (map vector ops (repeatedly chan))]
    (doseq [[op c] v]
      (thread (>!! c (op))))
    (let [[value channel] (alts!! (map second v))]
         value)))

;; Make sure the function returns what we expect with a simple Thread/sleep
(assert (= (return-first (fn [] (Thread/sleep 3000) 3000)
                         (fn [] (Thread/sleep 2000) 2000)
                         (fn [] (Thread/sleep 5000) 5000))
            2000))

In the sample above:

  • chan creates an asynchronous channel
  • >!! puts a value onto a channel
  • thread executes the body in another thread
  • alts!! takes a vector of channels, and returns when a value appears on any of them

There's way more to it than this, and I'm still getting my head round it, but there's a walkthrough here: https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

And David Nolen's blog has some great, if mind-boggling, posts on it (http://swannodette.github.io/)

Edit

Just seen that Michał Marczyk has answered a very similar question, but better, here, and it allows you to cancel/short-circuit. with Clojure threading long running processes and comparing their returns

like image 35
Daniel Neal Avatar answered Oct 09 '22 05:10

Daniel Neal


What you want is Java's CompletionService. I don't know of any wrapper around this in clojure, but it wouldn't be hard to do with interop. The example below is loosely based around the example on the JavaDoc page for the ExecutorCompletionService.

(defn f [col] 
    (let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool))
          futures (map #(.submit cs %) col)
          result (.get (.take cs))]
        (map #(.cancel % true) futures)
        result))
like image 1
Kevin Avatar answered Oct 09 '22 03:10

Kevin