Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: How to union a List<RDD> to RDD

I'm pretty new to spark and scala language and would like to union all RDDs in a List as below(List<RDD> to RDD) :

 val data = for (item <- paths) yield {
        val ad_data_path = item._1
        val ad_data = SparkCommon.sc.textFile(ad_data_path).map {
            line => {
                val ad_data = new AdData(line)
                (ad_data.ad_id, ad_data)
            }
        }.distinct()
    }
 val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _)

I run the code in IntelliJ while always get an error as:

ava.lang.NullPointerException
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59)
at org.apache.spark.rdd.RDD.union(RDD.scala:438)
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444)
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99)
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845)
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157)
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Anyone has any idea about the error? Thanks in advance:)

like image 923
juffun Avatar asked May 25 '15 09:05

juffun


People also ask

How do you convert RDD to another RDD?

Creating from another RDD You can use transformations like map, flatmap, filter to create a new RDD from an existing one. Above, creates a new RDD “rdd3” by adding 100 to each record on RDD.

Can RDD be shared between Sparkcontexts?

By design, RDDs cannot be shared between different Spark batch applications because each application has its own SparkContext . However, in some cases, the same RDD might be used by different Spark batch applications.

Can we create RDD from existing RDD?

Creating RDD from existing RDD. Transformation mutates one RDD into another RDD, thus transformation is the way to create an RDD from already existing RDD. This creates difference between Apache Spark and Hadoop MapReduce. Transformation acts as a function that intakes an RDD and produces one.


1 Answers

This might be the cause,

val listA = 1 to 10
for(i <- listA; if i%2 == 0)yield {i}

will return Vector(2,4,6,8,10), whereas

for(i <- listA; if i%2 == 0)yield {val c = i}

will return Vector ((),(),(),(),())

That is exactly what is happening in your case. You are initializing ad_data but not returning it back to yield.

As far as your question is concerned ,i.e List[RDD] to RDD

here is the solution:

val listA = sc.parallelize(1 to 10)
val listB = sc.parallelize(10 to 1 by -1)

creating list of 2 RDDS

val listC = List(listA,listB)

convert List[RDD] to RDD

val listD = listC.reduce(_ union _)

Hope, this answers your question.

like image 114
Akash Avatar answered Nov 15 '22 22:11

Akash