Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop go block in ClojureScript / core.async?

Is there an elegant way to stop a running go block?

(without introducing a flag and polluting the code with checks/branches)

(ns example
  (:require-macros [cljs.core.async.macros :refer [go]])
  (:require        [cljs.core.async        :refer [<! timeout]]))

(defn some-long-task []
  (go
    (println "entering")

    ; some complex long-running task (e.g. fetching something via network)
    (<! (timeout 1000)) 
    (<! (timeout 1000))
    (<! (timeout 1000))
    (<! (timeout 1000))

    (println "leaving")))

; run the task
(def task (some-long-task))

; later, realize we no longer need the result and want to cancel it 
; (stop! task)
like image 333
oshyshko Avatar asked Sep 14 '15 17:09

oshyshko


3 Answers

Sorry, this is not possible with core.async today. What you get back from creating a go block is a normal channel what the result of the block will be put on, though this does not give you any handle to the actual block itself.

like image 141
Arthur Ulfeldt Avatar answered Nov 13 '22 19:11

Arthur Ulfeldt


As stated in Arthur's answer, you cannot terminate a go block immediately, but you since your example indicates a multi-phased task (using sub-tasks), an approach like this might work:

(defn task-processor
  "Takes an initial state value and number of tasks (fns). Puts tasks
  on a work queue channel and then executes them in a go-loop, with
  each task passed the current state. A task's return value is used as
  input for next task. When all tasks are processed or queue has been
  closed, places current result/state onto a result channel. To allow
  nil values, result is wrapped in a map:

  {:value state :complete? true/false}

  This fn returns a map of {:queue queue-chan :result result-chan}"
  [init & tasks]
  (assert (pos? (count tasks)))
  (let [queue  (chan)
        result (chan)]
    (async/onto-chan queue tasks)
    (go-loop [state init, i 0]
      (if-let [task (<! queue)]
        (recur (task state) (inc i))
        (do (prn "task queue finished/terminated")
            (>! result {:value state :complete? (== i (count tasks))}))))
    {:queue  queue
     :result result}))

(defn dummy-task [x] (prn :task x) (Thread/sleep 1000) (inc x))

;; kick of tasks
(def proc (apply task-processor 0 (repeat 100 dummy-task)))

;; result handler
(go
  (let [res (<! (:result proc))]
    (prn :final-result res)))

;; to stop the queue after current task is complete
;; in this example it might take up to an additional second
;; for the terminated result to be delivered
(close! (:queue proc))
like image 25
toxi Avatar answered Nov 13 '22 17:11

toxi


You may want to use future and future-cancel for such task.

(def f (future (while (not (Thread/interrupted)) (your-function ... ))))
(future-cancel f)

Why do cancelled Clojure futures continue using CPU?

like image 44
edbond Avatar answered Nov 13 '22 17:11

edbond