Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Handling errors with clojure core.async pipeline

I am trying to understand what's the correct way to handle errors using core.async/pipeline, my pipeline is the following:

input     --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out

Where xf-run-computation will do an http calls and return response. However some of these responses will return an error. What's the best way to handles these errors? My solution is to split the outputs channels in success-values and error-values and then merge them back to a channel:

(let [[success-values1 error-values1] (split fn-to-split first-out)
      [success-values2 error-values2] (split fn-to-split last-out)
      errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out  xf-run-computation success-values1)
[last-out errors])

So my function will return the last results and the errors.

like image 888
Michel Uncini Avatar asked Dec 28 '16 17:12

Michel Uncini


2 Answers

Generally speaking, what is "the" correct way is probably depending on your application needs, but given your problem description, I think there are three things you need to consider:

  1. xf-run-computation returns data that your business logic would see as errors,
  2. xf-run-computation throws an exception and
  3. given that http calls are involved, some runs of xf-run-computation might never finish (or not finish in time).

Regarding point 3., the first thing you should consider is using pipeline-blocking instead of pipeline.

I think your question is mostly related to point 1. The basic idea is that the result of xf-run-computation needs to return a data structure (say a map or a record), which clearly marks a result as an error or a success, e.g. {:title nil :body nil :status "error"}. This will give you some options of dealing with the situation:

  • all your later code simply ignores input data which has :status "error". I.e., your xf-run-computation would contain a line like (when (not (= (:status input) "error")) (run-computation input)),

  • you could run a filter on all results between the pipeline-calls and filter them as needed (note that filter can also be used as a transducer in a pipeline, thereby obliterating the old filter> and filter< functions of core.async),

  • you use async/split like you suggested / Alan Thompson shows in his answer to to filter out the error values to a separate error channel. There is no real need to have a second error channel for your second pipeline if you're going to merge the values anyway, you can simply re-use your error channel.

For point 2., the problem is that any exception in xf-run-computation is happening in another thread and will not simply propagate back to your calling code. But you can make use of the ex-handler argument to pipeline (and pipeline-blocking). You could either simply filter out all exceptions, put the result on a separate exception channel or try to catch them and turn them into errors (potentially putting them back on the result or another error channel) -- the latter only makes sense, if the exception gives you enough information, e.g. an id or something that allows to tie back the exception to the input which caused the exception. You could arrange for this in xf-run-computation (i.e. catch any exception thrown from a third-party library like the http call).

For point 3, the canonical answer in core.async would be to point to a timeout channel, but this doesn't make much sense in relation to pipeline. A better idea is to ensure on your http calls that a timeout is set, e.g. the :timeout option of http-kit or :socket-timeout and :conn-timeout of clj-http. Note that these options will usually result in an exception on timeout.

like image 198
schaueho Avatar answered Sep 20 '22 16:09

schaueho


Here is an example that does what you are suggesting. Beginning with (range 10) it first filters out the multiples of 5, then the multiples of 3.

(ns tst.clj.core
  (:use clj.core
        clojure.test )
  (:require
    [clojure.core.async :as async]
    [clojure.string :as str]
  )
)

(defn err-3 [x]
  "'fail' for multiples of 3"
  (if (zero? (mod x 3))
    (+ x 300)       ; error case
    x))             ; non-error

(defn err-5 [x]
  "'fail' for multiples of 5"
  (if (zero? (mod x 5))
    (+ x 500)       ; error case
    x))             ; non-error

(defn is-ok?
  "Returns true if the value is not 'in error' (>=100)"
  [x]
  (< x 100))

(def ch-0  (async/to-chan (range 10)))
(def ch-1  (async/chan 99))
(def ch-2  (async/chan 99))

(deftest t-2
  (let [
        _                         (async/pipeline 1 ch-1 (map err-5) ch-0)
        [ok-chan-1 fail-chan-1]   (async/split is-ok? ch-1 99 99)
        _                         (async/pipeline 1 ch-2 (map err-3) ok-chan-1)
        [ok-chan-2 fail-chan-2]   (async/split is-ok? ch-2 99 99)

        ok-vec-2                  (async/<!! (async/into [] ok-chan-2))
        fail-vec-1                (async/<!! (async/into [] fail-chan-1))
        fail-vec-2                (async/<!! (async/into [] fail-chan-2))
  ]
    (is (= ok-vec-2 [1 2 4 7 8]))
    (is (= fail-vec-1 [500 505]))
    (is (= fail-vec-2 [303 306 309]))))

Rather than return the errors, I would probably just log them as soon as they are detected and then forget about them.

like image 32
Alan Thompson Avatar answered Sep 18 '22 16:09

Alan Thompson