I'm trying to read millions of rows from a database and write to a text file.
This is a continuation of my question database dump to text file with side effects
My problem now seems to be that the logging doesn't happen until the program completes. Another indicator that i'm not processing lazily is that the text file isn't written at all until the program finishes.
Based on an IRC tip it seems my issue is likely having to do with :result-set-fn
and defaulting to doall
in the clojure.java.jdbc/query
area of the code.
I have tried to replace this with a for
function but still discover that memory consumption is high as it pulls the entire result set into memory.
How can i have a :result-set-fn
that doesn't pull everything in like doall
? How can I progressively write the log file as the program is running, rather then dump everything once the -main
execution is finished?
(let [
db-spec local-postgres
sql "select * from public.f_5500_sf "
log-report-interval 1000
fetch-size 100
field-delim "\t"
row-delim "\n"
db-connection (doto ( j/get-connection db-spec) (.setAutoCommit false))
statement (j/prepare-statement db-connection sql :fetch-size fetch-size )
joiner (fn [v] (str (join field-delim v ) row-delim ) )
start (System/currentTimeMillis)
rate-calc (fn [r] (float (/ r (/ ( - (System/currentTimeMillis) start) 100))))
row-count (atom 0)
result-set-fn (fn [rs] (lazy-seq rs))
lazy-results (rest (j/query db-connection [statement] :as-arrays? true :row-fn joiner :result-set-fn result-set-fn))
]; }}}
(.setAutoCommit db-connection false)
(info "Started dbdump session...")
(with-open [^java.io.Writer wrtr (io/writer "output.txt")]
(info "Running query...")
(doseq [row lazy-results]
(.write wrtr row)
))
(info (format "Completed write with %d rows" @row-count))
)
You can use prepare-statement
with the :fetch-size
option. Otherwise, the query itself is eager despite the results being delivered in a lazy sequence.
prepare-statement
requires a connection object, so you'll need to explicitly create one. Here's an example of how your usage might look:
(let [db-spec local-postgres
sql "select * from big_table limit 500000 "
fetch-size 10000 ;; or whatever's appropriate
cnxn (doto (j/get-connection db-spec)
(.setAutoCommit false))
stmt (j/prepare-statement cnxn sql :fetch-size fetch-size)
results (rest (j/query cnxn [stmt]))]
;; ...
)
Another option
Since the problem seems to be with query
, try with-query-results
. It's considered deprecated but is still there and works. Here's an example usage:
(let [db-spec local-postgres
sql "select * from big_table limit 500000 "
fetch-size 100 ;; or whatever's appropriate
cnxn (doto (j/get-connection db-spec)
(.setAutoCommit false))
stmt (j/prepare-statement cnxn sql :fetch-size fetch-size)]
(j/with-query-results results [stmt] ;; binds the results to `results`
(doseq [row results]
;;
)))
I've have found a better solution: you need to declare a cursor and fetch chunks of data from it in a transaction. Example:
(db/with-tx
(db/execute! "declare cur cursor for select * from huge_table")
(loop []
(when-let [rows (-> "fetch 10 from cur" db/query not-empty)]
(doseq [row rows]
(process-a-row row))
(recur))))
Here, db/with-tx
, db/execute!
and db/query
are my own shortcuts declared in db
namespace:
(def ^:dynamic
*db* {:dbtype "postgresql"
:connection-uri <some db url>)})
(defn query [& args]
(apply jdbc/query *db* args))
(defn execute! [& args]
(apply jdbc/execute! *db* args))
(defmacro with-tx
"Runs a series of queries into transaction."
[& body]
`(jdbc/with-db-transaction [tx# *db*]
(binding [*db* tx#]
~@body)))
I took the recent fixes for clojure.java.jdbc
by putting [org.clojure/java.jdbc "0.3.0-beta1"]
in my project.clj dependencies listing. This one enhances/corrects the :as-arrays? true
functionality of clojure.java.jdbc/query
described here.
I think this helped somewhat however I may still have been able to override the :result-set-fn
to vec
.
The core issue was resolved by tucking all row logic into :row-fn
. The initial OutOfMemory problems had to do with iterating through j/query
result sets rather than defining the specific :row-fn
.
New (working) code is below:
(defn -main []
(let [; {{{
db-spec local-postgres
source-sql "select * from public.f_5500 "
log-report-interval 1000
fetch-size 1000
row-count (atom 0)
field-delim "\u0001" ; unlikely to be in source feed,
; although i should still check in
; replace-newline below (for when "\t"
; is used especially)
row-delim "\n" ; unless fixed-width, target doesn't
; support non-printable chars for recDelim like
db-connection (doto ( j/get-connection db-spec) (.setAutoCommit false))
statement (j/prepare-statement db-connection source-sql :fetch-size fetch-size :concurrency :read-only)
start (System/currentTimeMillis)
rate-calc (fn [r] (float (/ r (/ ( - (System/currentTimeMillis) start) 100))))
replace-newline (fn [s] (if (string? s) (clojure.string/replace s #"\n" " ") s))
row-fn (fn [v]
(swap! row-count inc)
(when (zero? (mod @row-count log-report-interval))
(info (format "wrote %d rows" @row-count))
(info (format "\trows/s %.2f" (rate-calc @row-count)))
(info (format "\tPercent Mem used %s " (memory-percent-used))))
(str (join field-delim (doall (map #(replace-newline %) v))) row-delim ))
]; }}}
(info "Started database table dump session...")
(with-open [^java.io.Writer wrtr (io/writer "./sql/output.txt")]
(j/query db-connection [statement] :as-arrays? true :row-fn
#(.write wrtr (row-fn %))))
(info (format "\t\t\tCompleted with %d rows" @row-count))
(info (format "\t\t\tCompleted in %s seconds" (float (/ (- (System/currentTimeMillis) start) 1000))))
(info (format "\t\t\tAverage rows/s %.2f" (rate-calc @row-count)))
nil)
)
Other things i experimented (with limited success) involved the timbre logging and turning off stardard out; i wondered if with using a REPL it might cache the results before displaying back to my editor (vim fireplace) and i wasn't sure if that was utilizing a lot of the memory.
Also, I added the logging parts around memory free with (.freeMemory (java.lang.Runtime/getRuntime))
. I wasn't as familiar with VisualVM and pinpointing exactly where my issue was.
I am happy with how it works now, thanks everyone for your help.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With