I'm a bit confused to find the right way to save data into HDFS after processing them with spark.
This is what I'm trying to do. I'm calculating min, max and SD of numeric fields. My input files have millions of rows, but output will have only around 15-20 fields. So, the output is a single value(scalar) for each field.
For example: I will load all the rows of FIELD1 into an RDD, and at the end, I will get 3 single values for FIELD 1(MIN, MAX, SD). I concatenated these three values into temporary string. In the end, I will have 15 to twenty rows, containing 4 columns in this following format
FIELD_NAME_1 MIN MAX SD
FIELD_NAME_2 MIN MAX SD
This is a snippet of the code:
//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))
val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev
So, i have 3 variables, min_value, max_value and SD that I want to store back to hdfs.
Question 1: Since the output will be rather small, do I just save it locally on the server? or should I dump it to HDFS. Seems to me like dumping the file locally makes better sense.
Question 2: In spark, I can just call the following to save an RDD into text file
some_RDD.saveAsTextFile("hdfs://namenode/path")
How do I accomplish the same thing in for a String variable that is not an RDD in scala? should I parallelize my result into an RDD first and then call saveAsTextFile?
To save locally just do
some_RDD.collect()
Then save the resulting array with something like from this question. And yes if the data set is small, and can easily fit in memory you should collect and bring it to the driver of the program. Another option if the data is a little to large to store in memory is just some_RDD.coalesce(numParitionsToStoreOn)
. Keep in mind coalesce
also takes a boolean shuffle
, if you are doing calculations on the data before coalescing, you should set this to true to get more parallelism on the calculations. Coalesce will reduce the number of nodes that store data when you call some_RDD.saveAsTextFile("hdfs://namenode/path")
. If the file is very small but you need it on hdfs, call repartition(1)
, which is the same as coalesce(1,true)
, this will ensure that your data is only saved on one node.
UPDATE:
So if all you want to do is save three values in HDFS you can do this.
sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")
Basically you are just putting the 3 vars in a tuple, wrap that in a List and set the parallelism to one since the data is very small
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With