Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Warning while using RDD in for comprehension

I get a warning when using an RDD in a for comprension, and I'm not sure if it's something I'm doing wrong. If I do this:

val sc = new SparkContext(...)

val anRDD = sc.parallelize(List(
  ("a", List(1, 2, 3)), 
  ("b", List(4), 
  ("c", List(5, 6))
)

for {
  (someString, listOfInts) <- anRDD
  someInt <- listOfInts
} yield (someString, someInt)

Then I get this output:

 warning: `withFilter' method does not yet exist on org.apache.spark.rdd.RDD[(String, List[Int])], using `filter' method instead
  (s, li) <- rl

But it does still successfully return a FlatMappedRDD[(String, Int)]. Am I doing something wrong? Or is it safe to ignore this warning?

Update: I would also accept as an answer how the for-comprehension converts these operations to map/flatMap/filter calls, since I didn't think there'd be any filter or withFilter calls required. I assumed it would be equivalent to something similar to this:

anRDD.flatMap(tuple => tuple.map(someInt => (tuple._1, someInt)))

But this doesn't include any filter or withFilter calls, which seems to be the source of the warning.

Oh, I'm using Spark 1.2.0, Scala 2.10.4, and this is all within the REPL.

like image 553
jayhutfles Avatar asked Jan 20 '15 15:01

jayhutfles


People also ask

Which of the following method can be used to persist an RDD and Store it as a text file on disk?

However, you may also persist an RDD in memory using the persist (or cache ) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.

What are the ways in which RDD can be created?

There are three ways to create an RDD in Spark. Parallelizing already existing collection in driver program. Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system). Creating RDD from already existing RDDs.

What operations does RDD support?

Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed.

Can we add data to RDD?

RDDs are immutable. They are not a read/write data structure. You would recreate an RDD from HBase to get new values.


1 Answers

First, I am no expert, but have done some digging and here is what I have found:

I compiled the code using -print (since JavaDecompiler was failing for some reason), which will print out the program with all Scala-specific features removed. There, I saw:

test.this.anRDD().filter({
    (new anonymous class anonfun$1(): Function1)
  }).flatMap({
    (new anonymous class anonfun$2(): Function1)
  }, ClassTag.apply(classOf[scala.Tuple2]));

You will notice the filter...so, I checked on the anonfun$1:

public final boolean apply(Tuple2<String, List<Object>> check$ifrefutable$1)
  {
    Tuple2 localTuple2 = check$ifrefutable$1;
    boolean bool;
    if (localTuple2 != null) {
      bool = true;
    } else {
      bool = false;
    }
    return bool;
  }

So, if you put all of this together, it seems that the filter is happening in the comprehension because it is filtering out anything that is NOT a Tuple2.

And, the preference is to use withFilter instead of filter (not sure why atm). You can see that by decompiling a regular list instead of an RDD

object test {
  val regList = List(
  ("a", List(1, 2, 3)), 
  ("b", List(4)),
  ("c", List(5, 6))
  )

val foo = for {
  (someString, listOfInts) <- regList
  someInt <- listOfInts
} yield (someString, someInt)
}

Which decompiles to:

test.this.regList().withFilter({
  (new anonymous class anonfun$1(): Function1)
}).flatMap({
  (new anonymous class anonfun$2(): Function1)
}, immutable.this.List.canBuildFrom()).$asInstanceOf[List]();

So, it is the same thing, except it uses withFilter where it can

like image 99
Justin Pihony Avatar answered Sep 20 '22 12:09

Justin Pihony