I get a warning when using an RDD in a for comprension, and I'm not sure if it's something I'm doing wrong. If I do this:
val sc = new SparkContext(...)
val anRDD = sc.parallelize(List(
("a", List(1, 2, 3)),
("b", List(4),
("c", List(5, 6))
)
for {
(someString, listOfInts) <- anRDD
someInt <- listOfInts
} yield (someString, someInt)
Then I get this output:
warning: `withFilter' method does not yet exist on org.apache.spark.rdd.RDD[(String, List[Int])], using `filter' method instead
(s, li) <- rl
But it does still successfully return a FlatMappedRDD[(String, Int)]. Am I doing something wrong? Or is it safe to ignore this warning?
Update: I would also accept as an answer how the for-comprehension converts these operations to map/flatMap/filter calls, since I didn't think there'd be any filter or withFilter calls required. I assumed it would be equivalent to something similar to this:
anRDD.flatMap(tuple => tuple.map(someInt => (tuple._1, someInt)))
But this doesn't include any filter or withFilter calls, which seems to be the source of the warning.
Oh, I'm using Spark 1.2.0, Scala 2.10.4, and this is all within the REPL.
However, you may also persist an RDD in memory using the persist (or cache ) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it.
There are three ways to create an RDD in Spark. Parallelizing already existing collection in driver program. Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system). Creating RDD from already existing RDDs.
Two types of Apache Spark RDD operations are- Transformations and Actions. A Transformation is a function that produces new RDD from the existing RDDs but when we want to work with the actual dataset, at that point Action is performed.
RDDs are immutable. They are not a read/write data structure. You would recreate an RDD from HBase to get new values.
First, I am no expert, but have done some digging and here is what I have found:
I compiled the code using -print
(since JavaDecompiler was failing for some reason), which will print out the program with all Scala-specific features removed. There, I saw:
test.this.anRDD().filter({
(new anonymous class anonfun$1(): Function1)
}).flatMap({
(new anonymous class anonfun$2(): Function1)
}, ClassTag.apply(classOf[scala.Tuple2]));
You will notice the filter
...so, I checked on the anonfun$1
:
public final boolean apply(Tuple2<String, List<Object>> check$ifrefutable$1)
{
Tuple2 localTuple2 = check$ifrefutable$1;
boolean bool;
if (localTuple2 != null) {
bool = true;
} else {
bool = false;
}
return bool;
}
So, if you put all of this together, it seems that the filter
is happening in the comprehension because it is filtering out anything that is NOT a Tuple2
.
And, the preference is to use withFilter
instead of filter
(not sure why atm). You can see that by decompiling a regular list instead of an RDD
object test {
val regList = List(
("a", List(1, 2, 3)),
("b", List(4)),
("c", List(5, 6))
)
val foo = for {
(someString, listOfInts) <- regList
someInt <- listOfInts
} yield (someString, someInt)
}
Which decompiles to:
test.this.regList().withFilter({
(new anonymous class anonfun$1(): Function1)
}).flatMap({
(new anonymous class anonfun$2(): Function1)
}, immutable.this.List.canBuildFrom()).$asInstanceOf[List]();
So, it is the same thing, except it uses withFilter
where it can
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With