Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: Splitting Pair RDD into multiple RDDs by key to save values

I am using Spark 1.0.1 to process a large amount of data. Each row contains an ID number, some with duplicate IDs. I want to save all the rows with the same ID number in the same location, but I am having trouble doing it efficiently. I create an RDD[(String, String)] of (ID number, data row) pairs:

val mapRdd = rdd.map{ x=> (x.split("\\t+")(1), x)}  

A way that works, but is not performant, is to collect the ID numbers, filter the RDD for each ID, and save the RDD of values with the same ID as a text file.

val ids = rdd.keys.distinct.collect ids.foreach({ id =>     val dataRows = mapRdd.filter(_._1 == id).values     dataRows.saveAsTextFile(id) }) 

I also tried a groupByKey or reduceByKey so that each tuple in the RDD contains a unique ID number as the key and a string of combined data rows separated by new lines for that ID number. I want to iterate through the RDD only once using foreach to save the data, but it can't give the values as an RDD

groupedRdd.foreach({ tup =>   val data = sc.parallelize(List(tup._2)) //nested RDD does not work   data.saveAsTextFile(tup._1) }) 

Essentially, I want to split an RDD into multiple RDDs by an ID number and save the values for that ID number into their own location.

like image 411
smli Avatar asked Jul 30 '14 20:07

smli


People also ask

Which method should be used when a given RDD is to be divided into number of partitions?

In Scala and Java, you can determine how an RDD is partitioned using its partitioner property (or partitioner() method in Java).

Which method will help to save RDD in Spark?

You can save the RDD using saveAsObjectFile and saveAsTextFile method. Whereas you can read the RDD using textFile and sequenceFile function from SparkContext.


1 Answers

I think this problem is similar to Write to multiple outputs by key Spark - one Spark job

Please refer the answer there.

import org.apache.hadoop.io.NullWritable  import org.apache.spark._ import org.apache.spark.SparkContext._  import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat  class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {   override def generateActualKey(key: Any, value: Any): Any =      NullWritable.get()    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =      key.asInstanceOf[String] }  object Split {   def main(args: Array[String]) {     val conf = new SparkConf().setAppName("Split" + args(1))     val sc = new SparkContext(conf)     sc.textFile("input/path")     .map(a => (k, v)) // Your own implementation     .partitionBy(new HashPartitioner(num))     .saveAsHadoopFile("output/path", classOf[String], classOf[String],       classOf[RDDMultipleTextOutputFormat])     spark.stop()   } } 

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

like image 103
zhang zhan Avatar answered Oct 13 '22 23:10

zhang zhan