I'm pretty new to spark and scala language and would like to union all RDDs in a List as below(List<RDD> to RDD
) :
val data = for (item <- paths) yield {
val ad_data_path = item._1
val ad_data = SparkCommon.sc.textFile(ad_data_path).map {
line => {
val ad_data = new AdData(line)
(ad_data.ad_id, ad_data)
}
}.distinct()
}
val ret = SparkCommon.sc.parallelize(data).reduce(_ ++ _)
I run the code in IntelliJ while always get an error as:
ava.lang.NullPointerException
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
at org.apache.spark.rdd.UnionRDD.<init>(UnionRDD.scala:59)
at org.apache.spark.rdd.RDD.union(RDD.scala:438)
at org.apache.spark.rdd.RDD.$plus$plus(RDD.scala:444)
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99)
at data.GenerateData$$anonfun$load_data$1.apply(GenerateData.scala:99)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:177)
at scala.collection.TraversableOnce$$anonfun$reduceLeft$1.apply(TraversableOnce.scala:172)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
at org.apache.spark.InterruptibleIterator.reduceLeft(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:847)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:845)
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157)
at org.apache.spark.SparkContext$$anonfun$26.apply(SparkContext.scala:1157)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Anyone has any idea about the error? Thanks in advance:)
Creating from another RDD You can use transformations like map, flatmap, filter to create a new RDD from an existing one. Above, creates a new RDD “rdd3” by adding 100 to each record on RDD.
By design, RDDs cannot be shared between different Spark batch applications because each application has its own SparkContext . However, in some cases, the same RDD might be used by different Spark batch applications.
Creating RDD from existing RDD. Transformation mutates one RDD into another RDD, thus transformation is the way to create an RDD from already existing RDD. This creates difference between Apache Spark and Hadoop MapReduce. Transformation acts as a function that intakes an RDD and produces one.
This might be the cause,
val listA = 1 to 10
for(i <- listA; if i%2 == 0)yield {i}
will return Vector(2,4,6,8,10), whereas
for(i <- listA; if i%2 == 0)yield {val c = i}
will return Vector ((),(),(),(),())
That is exactly what is happening in your case. You are initializing ad_data but not returning it back to yield.
As far as your question is concerned ,i.e List[RDD] to RDD
here is the solution:
val listA = sc.parallelize(1 to 10)
val listB = sc.parallelize(10 to 1 by -1)
creating list of 2 RDDS
val listC = List(listA,listB)
convert List[RDD] to RDD
val listD = listC.reduce(_ union _)
Hope, this answers your question.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With