Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a parallel logical or with early termination in Clojure

I would like to define a predicate that, taking as input some predicates with corresponding inputs (they could be given as a lazy sequence of calls), runs them in parallel and computes the logical or of the results, in such a way that, the moment a predicate call terminates returning true, the whole computation also terminates (returning true).

Apart from offering time optimization, this would also help avoiding non-termination in some cases (some predicate calls may not terminate). Actually, interpreting non-termination as a third undefined value, this predicate simulates the or operation in Kleene's K3 logic (the join in the initial centered Kleene algebra).

Something similar is presented here for the Haskell family. Is there any (preferably simple) way to do this in Clojure?

EDIT: I decided to add some clarifications after reading the comments.

(a) First of all, what happens after the thread pool gets exhausted is of less importance. I think creating a thread pool large enough for our needs is a reasonable convention.

(b) The most crucial requirement is that the predicate calls start running in parallel and, once a predicate call terminates returning true, all the other threads running get interrupted. The intended behavior is that:

  • if there is a predicate call returning true: the parallel or returns true
  • else if there is a predicate call that does not terminate: the parallel or does not terminate
  • else: the parallel or returns false

In other words, it behaves like the join in the 3-element lattice given by false<undefined<true, with undefined representing non-termination.

(c) The parallel or should be able to take as input many predicates and many predicate-inputs (each one corresponding to a predicate). But it would be even better if it took as input a lazy sequence. Then, naming the parallel or pany (for "parallel any"), we could have calls like the following:

  • (pany (map (comp eval list) predicates inputs))
  • (pany (map (comp eval list) predicates (repeat input)))
  • (pany (map (comp eval list) (repeat predicate) inputs)) which is equivalent to (pany (map predicate (unchunk inputs)))

As a final remark, I think that it is quite natural to ask for things like pany, a dual pall or a mechanism for building such early-terminating parallel reductions to be easily implementable or even built-in in a parallelism-oriented language like Clojure.

like image 789
peter pun Avatar asked Apr 30 '19 16:04

peter pun


1 Answers

I will define our predicates in terms of a reducing function. Practically, we could reimplement all of the Clojure iteration functions to support this parallel operation, but I'll just use reduce as an example.

I'll define a computation function. I'll just use the same one, but nothing stopping you from having many. The function is "true" if it accumulates 1000.

(defn computor [acc val]
        (let [new (+' acc val)] (if (> new 1000) (reduced new) new)))

(reduce computor 0 (range))
;; =>
1035

(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation

;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)] 
                 [computor 0 (range Long/MIN_VALUE 0)]])

Now let's get to the meat of this. I want to take a step in each computation, and if one of the computations completes, I want to return it. In actual fact one step at a time using pmap is very slow - the units of work are too small to be worth threading. Here I've changed things to do 1000 iterations of each unit of work before moving on. You'd probably tune this based on your workload and the cost of a step.

(defn p-or-reducer* [reductions]
        (let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

I then wrap this in a driver.

(defn p-or [s]
  (p-or-reducer* (map #(apply reductions %) s)))

(p-or predicates)
;; =>
1035

Where to insert the CPU parallelism? s/map/pmap/ in p-or-reducer* should do it. I suggest just parallelising the first operation, as this will drive the reducing sequences to compute.

(defn p-or-reducer* [reductions]
        (let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
                             [computor 0 (range)]))

(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not

It's extremely hard to define a performant generic version of this. Without knowing the cost per iteration an efficient parallelism strategy is hard to derive - if one iteration take 10s then we'd probably take a single step at a time. If it takes 100ns then we need to take many steps at a time.

like image 68
pete23 Avatar answered Oct 21 '22 05:10

pete23