Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clojure core.async, any way to control number of threads in that (go...) thread pool?

By default (go..) will use twice the number of cores + 42 threads for the thread pool. Is there any way I can set the number of threads, or number of CPUs that the code can use, through setting an environment variable or sth?

On linux machine I can set number of CPU using taskset, e.g. taskset -c 0,1 my_Java_or_Clojure_program, although taskset seems not effective on the number returned by (-> (java.lang.Runtime/getRuntime) .availableProcessors).

like image 486
Kevin Zhu Avatar asked Sep 13 '13 06:09

Kevin Zhu


2 Answers

The current accepted answer was valid up to this commit so basically now you have two cases:

  • If you want to just change the max number of threads in the pool, you pass the number as Java property clojure.core.async.pool-size (it defaults to 8)

  • If you want to replace the ExecutorService, you use the same trick of alter-var-root but targeting the new implementation (there is a protocol to implement):

     (ns your-app.threadpool
       (:require [clojure.core.async.impl.protocols :as protocols]
                 [clojure.core.async.impl.concurrent :as conc]
                 [clojure.core.async.impl.exec.threadpool :as tp])
       (:import java.util.concurrent.Executors))
    
     (defonce my-executor
       (let [executor-svc (Executors/newFixedThreadPool
                           1
                           (conc/counted-thread-factory "async-dispatch-%d" true))]
         (reify protocols/Executor
            (protocols/exec [this r]
              (.execute executor-svc ^Runnable r)))))
    
     (alter-var-root #'clojure.core.async.impl.dispatch/executor
                     (constantly (delay my-executor)))
    
like image 65
Andrea Richiardi Avatar answered Oct 09 '22 16:10

Andrea Richiardi


In the current Clojure version of core.async, the thread pool executor is located in the clojure.core.async.impl.dispatch namespace. You can alter the executor var and supply a custom thread pool ExecutorService.

(ns sandbox
  (:require [clojure.core.async.impl.concurrent :as conc]
            [clojure.core.async.impl.exec.threadpool :as tp]
            [clojure.core.async :as async]))

(defonce my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   1
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

(async/go
 (println 
  (Thread/currentThread))) ;=> #<Thread Thread[my-async-dispatch-1,5,main]>

Note: Core.async is still in alpha, so, hopefully, this will change in the future.

like image 44
Jared314 Avatar answered Oct 09 '22 15:10

Jared314