I am trying to understand what's the correct way to handle errors using core.async/pipeline, my pipeline is the following:
input --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out
Where xf-run-computation
will do an http calls and return response. However some of these responses will return an error. What's the best way to handles these errors?
My solution is to split the outputs channels in success-values
and error-values
and then merge them back to a channel:
(let [[success-values1 error-values1] (split fn-to-split first-out)
[success-values2 error-values2] (split fn-to-split last-out)
errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out xf-run-computation success-values1)
[last-out errors])
So my function will return the last results and the errors.
Generally speaking, what is "the" correct way is probably depending on your application needs, but given your problem description, I think there are three things you need to consider:
xf-run-computation
returns data that your business logic would see as errors,xf-run-computation
throws an exception andxf-run-computation
might never finish (or not finish in time).Regarding point 3., the first thing you should consider is using pipeline-blocking
instead of pipeline
.
I think your question is mostly related to point 1. The basic idea is that the result of xf-run-computation
needs to return a data structure (say a map or a record), which clearly marks a result as an error or a success, e.g. {:title nil :body nil :status "error"}
. This will give you some options of dealing with the situation:
all your later code simply ignores input data which has :status "error"
. I.e., your xf-run-computation
would contain a line like (when (not (= (:status input) "error")) (run-computation input))
,
you could run a filter on all results between the pipeline
-calls and filter
them as needed (note that filter
can also be used as a transducer in a pipeline, thereby obliterating the old filter>
and filter<
functions of core.async),
you use async/split
like you suggested / Alan Thompson shows in his answer to to filter out the error values to a separate error channel. There is no real need to have a second error channel for your second pipeline if you're going to merge the values anyway, you can simply re-use your error channel.
For point 2., the problem is that any exception in xf-run-computation
is happening in another thread and will not simply propagate back to your calling code. But you can make use of the ex-handler
argument to pipeline
(and pipeline-blocking
). You could either simply filter out all exceptions, put the result on a separate exception channel or try to catch them and turn them into errors (potentially putting them back on the result or another error channel) -- the latter only makes sense, if the exception gives you enough information, e.g. an id or something that allows to tie back the exception to the input which caused the exception. You could arrange for this in xf-run-computation
(i.e. catch
any exception thrown from a third-party library like the http call).
For point 3, the canonical answer in core.async would be to point to a timeout
channel, but this doesn't make much sense in relation to pipeline
. A better idea is to ensure on your http calls that a timeout is set, e.g. the :timeout
option of http-kit or :socket-timeout
and :conn-timeout
of clj-http. Note that these options will usually result in an exception on timeout.
Here is an example that does what you are suggesting. Beginning with (range 10)
it first filters out the multiples of 5, then the multiples of 3.
(ns tst.clj.core
(:use clj.core
clojure.test )
(:require
[clojure.core.async :as async]
[clojure.string :as str]
)
)
(defn err-3 [x]
"'fail' for multiples of 3"
(if (zero? (mod x 3))
(+ x 300) ; error case
x)) ; non-error
(defn err-5 [x]
"'fail' for multiples of 5"
(if (zero? (mod x 5))
(+ x 500) ; error case
x)) ; non-error
(defn is-ok?
"Returns true if the value is not 'in error' (>=100)"
[x]
(< x 100))
(def ch-0 (async/to-chan (range 10)))
(def ch-1 (async/chan 99))
(def ch-2 (async/chan 99))
(deftest t-2
(let [
_ (async/pipeline 1 ch-1 (map err-5) ch-0)
[ok-chan-1 fail-chan-1] (async/split is-ok? ch-1 99 99)
_ (async/pipeline 1 ch-2 (map err-3) ok-chan-1)
[ok-chan-2 fail-chan-2] (async/split is-ok? ch-2 99 99)
ok-vec-2 (async/<!! (async/into [] ok-chan-2))
fail-vec-1 (async/<!! (async/into [] fail-chan-1))
fail-vec-2 (async/<!! (async/into [] fail-chan-2))
]
(is (= ok-vec-2 [1 2 4 7 8]))
(is (= fail-vec-1 [500 505]))
(is (= fail-vec-2 [303 306 309]))))
Rather than return the errors, I would probably just log them as soon as they are detected and then forget about them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With