Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: RDD to List

I have a RDD structure

RDD[(String, String)]

and I want to create 2 Lists (one for each dimension of the rdd).

I tried to use the rdd.foreach() and fill two ListBuffers and then convert them to Lists, but I guess each node creates its own ListBuffer because after the iteration the BufferLists are empty. How can I do it ?

EDIT : my approach

val labeled = data_labeled.map { line =>
  val parts = line.split(',')
  (parts(5), parts(7))
}.cache()

var testList : ListBuffer[String] = new ListBuffer()

labeled.foreach(line =>
  testList += line._1
)
  val labeledList = testList.toList
  println("rdd: " + labeled.count)
  println("bufferList: " + testList.size)
  println("list: " + labeledList.size)

and the result is:

rdd: 31990654
bufferList: 0
list: 0
like image 303
bill Avatar asked Nov 30 '16 16:11

bill


2 Answers

If you really want to create two Lists - meaning, you want all the distributed data to be collected into the driver application (risking slowness or OutOfMemoryError) - you can use collect and then use simple map operations on the result:

val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)

Alternatively - if you want to "split" your RDD into two RDDs - it's pretty similar without collecting the data:

rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)

A third alternative is to first map into these two RDDs and then collect each one of them, but it's not much different from the first option and suffers from the same risks and limitations.

like image 111
Tzach Zohar Avatar answered Sep 28 '22 03:09

Tzach Zohar


As an alternative to Tzach Zohar's answer, you can use unzip on the lists:

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)

Or keys and values on the RDDs:

scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33

scala> rdd1.foreach{println}
a
c

scala> rdd2.foreach{println}
d
b
like image 41
evan.oman Avatar answered Sep 28 '22 02:09

evan.oman