Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using scala to dump result processed by Spark to HDFS

I'm a bit confused to find the right way to save data into HDFS after processing them with spark.

This is what I'm trying to do. I'm calculating min, max and SD of numeric fields. My input files have millions of rows, but output will have only around 15-20 fields. So, the output is a single value(scalar) for each field.

For example: I will load all the rows of FIELD1 into an RDD, and at the end, I will get 3 single values for FIELD 1(MIN, MAX, SD). I concatenated these three values into temporary string. In the end, I will have 15 to twenty rows, containing 4 columns in this following format

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD

This is a snippet of the code:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev

So, i have 3 variables, min_value, max_value and SD that I want to store back to hdfs.

Question 1: Since the output will be rather small, do I just save it locally on the server? or should I dump it to HDFS. Seems to me like dumping the file locally makes better sense.

Question 2: In spark, I can just call the following to save an RDD into text file

some_RDD.saveAsTextFile("hdfs://namenode/path")

How do I accomplish the same thing in for a String variable that is not an RDD in scala? should I parallelize my result into an RDD first and then call saveAsTextFile?

like image 642
user2773013 Avatar asked Dec 03 '22 19:12

user2773013


1 Answers

To save locally just do

some_RDD.collect()

Then save the resulting array with something like from this question. And yes if the data set is small, and can easily fit in memory you should collect and bring it to the driver of the program. Another option if the data is a little to large to store in memory is just some_RDD.coalesce(numParitionsToStoreOn). Keep in mind coalesce also takes a boolean shuffle, if you are doing calculations on the data before coalescing, you should set this to true to get more parallelism on the calculations. Coalesce will reduce the number of nodes that store data when you call some_RDD.saveAsTextFile("hdfs://namenode/path"). If the file is very small but you need it on hdfs, call repartition(1), which is the same as coalesce(1,true), this will ensure that your data is only saved on one node.

UPDATE: So if all you want to do is save three values in HDFS you can do this. sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

Basically you are just putting the 3 vars in a tuple, wrap that in a List and set the parallelism to one since the data is very small

like image 105
aaronman Avatar answered Dec 05 '22 09:12

aaronman