I have a spark streaming environment with spark 1.2.0 where i retrieve data from a local folder and every time I find a new file added to the folder I perform some transformation.
val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)
In order to perform my analysis on DStream data I have to transform it into an Array
var arr = new ArrayBuffer[String]();
data.foreachRDD {
arr ++= _.collect()
}
Then I use data obtained to extract the information I want and to save them on HDFS.
val myRDD = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")
Since I really need to manipulate data with an Array it's impossible to save data on HDFS with DStream.saveAsTextFiles("...")
(which would work fine) and I have to save the RDD but with this preocedure I finally have empty output files named part-00000 etc...
With an arr.foreach(println)
I am able to see the correct results of the transofmations.
My suspect is that spark tries at every batch to write data in the same files, deleting what was previously written. I tried to save in a dynamic named folder like myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString())
but always only one foldes is created and the output files are still empty.
How can I write an RDD into HDFS in a spark-streaming context?
Which function allows you to write RDD to HDFS? Text file RDDs can be created using SparkContext 's textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs:// , s3n:// , etc URI) and reads it as a collection of lines.
So the data would start pouring in a stream in batches, this continuous stream of data is called DStream. Every batch of dsteam would contain collection of elements that can be processed in parallel, this collection is called RDD.
saveAsTextFile. Save this RDD as a text file, using string representations of elements. Empty lines are tolerated when saving to text files.
You are using Spark Streaming in a way it wasn't designed. I'd either recommend drop using Spark for your use case, or adapt your code so it works the Spark way. Collecting the array to the driver defeats the purpose of using a distributed engine and makes your app effectively single-machine (two machines will also cause more overhead than just processing the data on a single machine).
Everything you can do with an array, you can do with Spark. So just run your computations inside the stream, distributed on the workers, and write your output using DStream.saveAsTextFiles()
. You can use foreachRDD
+ saveAsParquet(path, overwrite = true)
to write to a single file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With