I have been using clojure for a while but I am unfamiliar with async io that the twitter-api (https://github.com/adamwynne/twitter-api) is based on.
I want to collect all tweets matching a given set of keywords. For example everything matching "Mary rose" (something trending in the UK right now). The docs for making streaming calls say to do something like:
(ns mynamespace
(:use
[twitter.oauth]
[twitter.callbacks]
[twitter.callbacks.handlers]
[twitter.api.streaming])
(:require
[clojure.data.json :as json]
[http.async.client :as ac]
[clojure.java.io :as io])
(:import
(twitter.callbacks.protocols AsyncStreamingCallback)))
(def my-creds (make-oauth-creds *app-consumer-key*
*app-consumer-secret*
*user-access-token*
*user-access-token-secret*))
; supply a callback that only prints the text of the status
(def ^:dynamic
*custom-streaming-callback*
(AsyncStreamingCallback. (comp println #(:text %) json/read-json #(str %2))
(comp println response-return-everything)
exception-print))
(statuses-filter :params {:track "mary rose"}
:oauth-creds my-creds
:callbacks *custom-streaming-callback*)
If I then do something like:
(def mary (statuses-filter :params {:track "mary rose"}
:oauth-creds my-creds
:callbacks *custom-streaming-callback*))
I get a map of the http response:
(keys mary)
;; (:id :url :raw-url :status :headers :body :done :error)
I think the body part is the bit that is continually updated:
(class @(:body mary))
;; java.io.ByteArrayOutputStream
and have tried this to save the stream to a file:
(with-open [r @(:body (statuses-filter :params {:track "mary rose"}
:oauth-creds my-creds
:callbacks *custom-streaming-callback*))
w (io/writer "mary.txt")]
(dosync (.write w (str r "\n"))))
This writes the first tweet that comes up to the mary.txt file but then closes the connection - presumably because I am using the @ in front of the binding to r (but it chokes if I put the @ in front of the r in the desync instead. Also, if I do:
@(dosync (:body (statuses-filter :params {:track "mary rose"}
:oauth-creds my-creds
:callbacks *custom-streaming-callback*)))
again I only get the first tweet before the connection is closed.
How can I keep the connection open to continue receiving tweets indefinitely?
You should put your “write” code into that callback:
(let [w (io/writer "mary.txt")
callback (AsyncStreamingCallback.
(fn [_resp payload]
(.write w (-> (str payload) json/read-json :text))
(.write w "\n"))
(fn [_resp]
(.close w))
(fn [_resp ex]
(.close w)
(.printStackTrace ex)))]
(statuses-filter
:params {:track "mary rose"}
:oauth-creds my-creds
:callbacks callback))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With