I have a number of (unevaluated) expressions held in a vector; [ expr1 expr2 expr3 ... ]
What I wish to do is hand each expression to a separate thread and wait until one returns a value. At that point I'm not interested in the results from the other threads and would like to cancel them to save CPU resource.
( I realise that this could cause non-determinism in that different runs of the program might cause different expressions to be evaluated first. I have this in hand. )
Is there a standard / idiomatic way of achieving the above?
Here's my take on it.
Basically you have to resolve a global promise inside each of your futures, then return a vector containing future list and the resolved value and then cancel all the futures in the list:
(defn run-and-cancel [& expr]
(let [p (promise)
run-futures (fn [& expr] [(doall (map #(future (deliver p (eval %1))) expr)) @p])
[fs res] (apply run-futures expr)]
(map future-cancel fs)
res))
It's not reached an official release yet, but core.async
looks like it might be an interesting way of solving your problem - and other asynchronous problems, very neatly.
The leiningen incantation for core.async
is (currently) as follows:
[org.clojure/core.async "0.1.0-SNAPSHOT"]
And here's some code to make a function that will take a number of time-consuming functions, and block until one of them returns.
(require '[clojure.core.async :refer [>!! chan alts!! thread]]))
(defn return-first [& ops]
(let [v (map vector ops (repeatedly chan))]
(doseq [[op c] v]
(thread (>!! c (op))))
(let [[value channel] (alts!! (map second v))]
value)))
;; Make sure the function returns what we expect with a simple Thread/sleep
(assert (= (return-first (fn [] (Thread/sleep 3000) 3000)
(fn [] (Thread/sleep 2000) 2000)
(fn [] (Thread/sleep 5000) 5000))
2000))
In the sample above:
chan
creates an asynchronous channel>!!
puts a value onto a channelthread
executes the body in another threadalts!!
takes a vector of channels, and returns when a value appears on any of themThere's way more to it than this, and I'm still getting my head round it, but there's a walkthrough here: https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
And David Nolen's blog has some great, if mind-boggling, posts on it (http://swannodette.github.io/)
Edit
Just seen that Michał Marczyk has answered a very similar question, but better, here, and it allows you to cancel/short-circuit. with Clojure threading long running processes and comparing their returns
What you want is Java's CompletionService. I don't know of any wrapper around this in clojure, but it wouldn't be hard to do with interop. The example below is loosely based around the example on the JavaDoc page for the ExecutorCompletionService.
(defn f [col]
(let [cs (ExecutorCompletionService. (Executors/newCachedThreadPool))
futures (map #(.submit cs %) col)
result (.get (.take cs))]
(map #(.cancel % true) futures)
result))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With