Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I save streaming tweets from twitter-api in Clojure?

I have been using clojure for a while but I am unfamiliar with async io that the twitter-api (https://github.com/adamwynne/twitter-api) is based on.

I want to collect all tweets matching a given set of keywords. For example everything matching "Mary rose" (something trending in the UK right now). The docs for making streaming calls say to do something like:

(ns mynamespace
  (:use
   [twitter.oauth]
   [twitter.callbacks]
   [twitter.callbacks.handlers]
   [twitter.api.streaming])
  (:require
   [clojure.data.json :as json]
   [http.async.client :as ac]
   [clojure.java.io :as io])
  (:import
   (twitter.callbacks.protocols AsyncStreamingCallback)))

(def my-creds (make-oauth-creds *app-consumer-key*
                            *app-consumer-secret*
                            *user-access-token*
                            *user-access-token-secret*))

; supply a callback that only prints the text of the status
(def ^:dynamic 
     *custom-streaming-callback* 
     (AsyncStreamingCallback. (comp println #(:text %) json/read-json #(str %2)) 
                     (comp println response-return-everything)
              exception-print))

(statuses-filter :params {:track "mary rose"}
     :oauth-creds my-creds
     :callbacks *custom-streaming-callback*)

If I then do something like:

(def mary (statuses-filter :params {:track "mary rose"}
     :oauth-creds my-creds
     :callbacks *custom-streaming-callback*))

I get a map of the http response:

(keys mary)
;; (:id :url :raw-url :status :headers :body :done :error)

I think the body part is the bit that is continually updated:

(class @(:body mary))
;; java.io.ByteArrayOutputStream

and have tried this to save the stream to a file:

(with-open [r @(:body (statuses-filter :params {:track "mary rose"}
    :oauth-creds my-creds
    :callbacks *custom-streaming-callback*))
            w (io/writer "mary.txt")]
  (dosync (.write w (str r "\n")))) 

This writes the first tweet that comes up to the mary.txt file but then closes the connection - presumably because I am using the @ in front of the binding to r (but it chokes if I put the @ in front of the r in the desync instead. Also, if I do:

@(dosync (:body (statuses-filter :params {:track "mary rose"}
    :oauth-creds my-creds
    :callbacks *custom-streaming-callback*)))

again I only get the first tweet before the connection is closed.

How can I keep the connection open to continue receiving tweets indefinitely?

like image 408
dspringate Avatar asked Nov 03 '22 21:11

dspringate


1 Answers

You should put your “write” code into that callback:

(let [w (io/writer "mary.txt")
      callback (AsyncStreamingCallback.
                 (fn [_resp payload]
                   (.write w (-> (str payload) json/read-json :text))
                   (.write w "\n"))
                 (fn [_resp]
                   (.close w))
                 (fn [_resp ex]
                   (.close w)
                   (.printStackTrace ex)))]
  (statuses-filter
    :params {:track "mary rose"}
    :oauth-creds my-creds
    :callbacks callback))
like image 147
Niki Tonsky Avatar answered Nov 15 '22 12:11

Niki Tonsky