I'm writing a an ETL process to read event level data from a product database, transform / aggregate it and write to to an analytics data warehouse. I'm using clojure's core.async library to separate these process into concurrently executing components. Here's what the main part of my code looks like right now
(ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
[data-staging.map-vecs]
[data-staging.tables])
(:gen-class))
(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))
;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))
(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from
table and ouputting a vector of the rows(maps)
into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))
(defn aggregator []
"takes output from 'chan-in' and aggregates it by coupon_id, date.
then adds / drops any fields that are needed / not needed and inputs
into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))
(defn read-thread []
"reads data from chan out and interts into Analytics DB"
(while true
(upsert (async/<!! chan-out))))
(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))
As you can see, I'm putting each os component on to its own thread and using the blocking >!! call on the channels. It feels like using the non-blocking >! calls along with go routines might be better for this use case, especially for the database reads which spend most of their time performing i/o and waiting for new rows in the product db. Is this the case, and if so, what would be the best way to implement it? I'm a little unclear on all the tradeoffs between the two methods and on exactly how to effectively use go routines. Also any other suggestions on how to improve the overall architecture would be much appreciated!
Personally, I think your use of threads here is probably the right call. The magic non-blocking nature of go-blocks comes from "parking," which is a special sort of pseudo-blocking that core.async's state machine uses — but since your database calls genuinely block instead of putting the state machine into a parked state, you'd just be blocking some thread from the core.async thread pool. It does depend on how long your synchronous calls take, so this is the sort of thing where benchmarks can be informative, but I strongly suspect threads are the right approach here.
The one exception is your aggregator function. It looks to me like it could just be folded into the definition of chan-out, as (def chan-out (map< aggregate chan-in))
.
For a general overview of go-blocks versus threads, Martin Trojer wrote a good examination of the two approaches and which one is faster in which situation. The Cliff's Notes version is that go-blocks are good for adapting already-asynchronous libraries for use with core.async, while threads are good for making asynchronous processes out of synchronous parts. If your database had a callback-based API, for example, then go-blocks would be a definite win. But since it is synchronous, they are not a good fit.
i think that it would be a better approach to use "go" macros to have non-blocking-threads in this ETL case.
I've written a very simple code to achieve the synchronized sequence of processes implied in Extract Transform and Load tasks
Type on your repl the following code:
(require '[clojure.core.async :as async :refer [<! >! <!! timeout chan alt! go]])
(def output(chan))
(defn extract [origin]
(let [value-extracted (chan)
value-transformed (chan)
value-loaded (chan)]
(go
(<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
(>! value-extracted (str origin " > extracted ")))
(go
(<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
(>! value-transformed (str (<! value-extracted) " > transformed " )))
(go
(<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
(>! value-loaded (str (<! value-transformed) " > loaded " )))
(go
(<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
(>! output [origin (<! value-loaded)]))))
(go
(loop [origins-already-loaded []]
(let [[id message] (<! output)
origins-updated (conj origins-already-loaded id)]
(println message)
(println origins-updated)
(recur origins-updated)
)
))
Type on the repl:
(doseq [example (take 10 (range))] (extract example))
1 > extracted > transformed > loaded
[1]
7 > extracted > transformed > loaded
[1 7]
0 > extracted > transformed > loaded
[1 7 0]
8 > extracted > transformed > loaded
[1 7 0 8]
3 > extracted > transformed > loaded
[1 7 0 8 3]
6 > extracted > transformed > loaded
[1 7 0 8 3 6]
2 > extracted > transformed > loaded
[1 7 0 8 3 6 2]
5 > extracted > transformed > loaded
[1 7 0 8 3 6 2 5]
9 > extracted > transformed > loaded
[1 7 0 8 3 6 2 5 9]
4 > extracted > transformed > loaded
[1 7 0 8 3 6 2 5 9 4]
UPDATE:
the error fixed was to use <!! (timeout (+ 100 (* 100 (rand-int 20)))))
inside the removed function "wait-a-while" that was blocking the others no blocking go processes
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With