I am looking for an idiomatic way to do the following. I have a http server that on a particular GET request responds with a stream of messages. Now, since this message is non-terminating, when I use clj-http/get, the call just blocks forever (I am using LightTable). I would like to set up a callback or a core.async style channel to do some operation on the message as it comes in. Even writing the stream to a file would be a good first step for me. Any pointers? Here is the call:
(require '[clj-http.client :as client])
(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
(client/get url)
The date has to be changed to today's date for the data to stream. Thanks!
To just write the stream to file, a simple approach is using clojure.java.io/copy
(which takes an input-stream such as that returned by (:body (client/get some-url {:as :stream}))
and an output stream and copies from one to the other). Something like
(ns http-stream
(:require [clj-http.client :as client]
[clojure.java.io :as io]))
(with-open [in-stream (:body (client/get "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt" {:as :stream}))
out-stream (->> "streamoutput.txt"
io/as-file
io/output-stream)]
(io/copy in-stream out-stream))
That gave me several thousand lines of tab separated values over a couple seconds. Now, to process them with core.async at the level of lines we probably want to process the stream a bit more using a reader
and a line-seq
:
(ns http-stream
(:require [clj-http.client :as client]
[clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.string :as str]))
(defn trades-chan
"Open the URL as a stream of trades information. Return a channel of the trades, represented as strings."
[dump-url]
(let[lines (-> dump-url
(client/get {:as :stream})
:body
io/reader
line-seq) ];;A lazy seq of each line in the stream.
(async/to-chan lines))) ;;Return a channel which outputs the lines
;;Example: Print the first 250 lines.
(let [a (trades-chan "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")]
(async/go-loop [takes 250]
(when (< 0 takes)
(println (async/<! a))
(recur (dec takes)))))
Now, with this you are largely started up, but I notice that the stream always starts with a description of what the columns are
time price quantity board source buyer seller initiator
and you can use that as a chance to improve a little bit. In particular, that's enough information to build a transducer for the trades-chan that can turn the trades into a more convenient format to work with, like a map. Also, we likely want a way to stop taking elements and close the connection sometime. I'm not that familiar with core.async myself but this seems to work:
(defn trades-chan
"Open the URL as a tab-separated values stream of trades.
Returns a core.async channel of the trades, represented as maps.
Closes the HTTP stream on channel close!"
[dump-url]
(let[stream (-> dump-url
(client/get {:as :stream})
:body)
lines (-> stream
io/reader
line-seq) ;;A lazy seq of each line in the stream.
fields (map keyword (str/split (first lines) #"\t")) ;; (:time :price :quantity ...
transducer (map (comp #(zipmap fields %) #(str/split % #"\t"))) ;;A transducer that splits strings on tab and makes them into maps with keys from fields
output-chan (async/chan 50 transducer)]
(async/go-loop [my-lines (drop 1 lines)]
(if (async/>! output-chan (first my-lines)) ;;If we managed to put
(recur (rest my-lines)) ;;then the chan is not closed. Recur with the rest of the lines.
(.close stream))) ;;else close the HTTP stream.
output-chan))
I think user1571406's answer is reasonable and gives a good introduction to combining clj-http
with core.async
. However, if you do not stick to clj-http
, I would like to strongly recommend the http-kit library, which is more designed for asynchronous response handling. Using http-kit
, you can write your call back as follows.
user> (require '[clojure.java.io :as io]
'[org.httpkit.client :as h])
nil
user> (def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
#'user/url
user> (h/get url {:as :stream}
(fn [{:keys [status body]}]
(if (= status 200)
(with-open [out (io/output-stream "/tmp/output.txt")]
(io/copy body out)))))
#<core$promise$reify__6363@373b22df: :pending>
The last h/get
function call returns immediately, and its callback fn
writes the body of the response to the file /tmp/output.txt
asynchronously.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With