I would like to convert a DStream into an array, list, etc. so I can then translate it to json and serve it on an endpoint. I'm using apache spark, injecting twitter data. How do I preform this operation on the Dstream statuses
? I can't seem to get anything to work other than print()
.
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
def main(args: Array[String]) {
// Location of the Spark directory
val sparkHome = "/opt/spark"
// URL of the Spark cluster
val sparkUrl = "local[8]"
// Location of the required JAR files
val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"
// HDFS directory for checkpointing
val checkpointDir = "/tmp"
// Configure Twitter credentials using twitter.txt
TutorialHelper.configureTwitterCredentials()
val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))
val filters = Array("#americasgottalent", "iamawesome")
val tweets = TwitterUtils.createStream(ssc, None, filters)
val statuses = tweets.map(status => status.getText())
val arry = Array("firstval")
statuses.foreachRDD {
arr :+ _.collect()
}
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
}
}
As far as I know, an RDD cannot be converted into a DStream because an RDD is a collection of data, while a DStream is a concept referring to incoming data. If you want to use StreamingKMeans, take the data that you formed into an RDD, and instead convert it to a DStream, possibly using KafkaUtils.
Different transformations in DStream in Apache Spark Streaming are: 1-map(func) — Return a new DStream by passing each element of the source DStream through a function func. 2-flatMap(func) — Similar to map, but each input item can be mapped to 0 or more output items.
A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a continuous stream of data (see spark. RDD for more details on RDDs).
If your RDD is statuses
you can do.
val arr = new ArrayBuffer[String]();
statuses.foreachRDD {
arr ++= _.collect() //you can now put it in an array or d w/e you want with it
...
}
Keep in mind this could end up being way more data than you want in your driver since a DStream can be huge.
Turns our you were close, but what I ended up looking for is.
statuses.foreachRDD( rdd => {
for(item <- rdd.collect().toArray) {
println(item);
}
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With