Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark rdd filter by element class

I have an RDD with the elements of different types, and I want to count them by their types, for example, the code below will work correctly.

scala> val rdd = sc.parallelize(List(1, 2.0, "abc"))
rdd: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.filter{case z:Int => true; case _ => false}.count
res0: Long = 1

scala> rdd.filter{case z:String => true; case _ => false}.count
res1: Long = 1

Now what if the elements are of user-defined types, the code below won't work as expected.

scala> class TypeA extends Serializable              // this is the base class
defined class TypeA

scala> case class TypeB(id:Long) extends TypeA       // derived class 1
defined class TypeB

scala> case class TypeC(name:String) extends TypeA   // derived class 2
defined class TypeC

scala> val rdd1 = sc.parallelize(List(TypeB(123), TypeC("jack"), TypeB(456)))   // create an rdd with different types of elements
rdd1: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[3] at parallelize at <console>:29

scala> rdd1.count              // total size is correct
res2: Long = 3

scala> rdd1.filter{case z:TypeB => true; case _ => false}.count   // what the hell?
res3: Long = 0

scala> rdd1.filter{case z:TypeC => true; case _ => false}.count   // again ?
res4: Long = 0

scala> rdd1.filter{case z:TypeA => true; case _ => false}.count   // only works for the base class?
res5: Long = 3

Did I miss anything here? Help please!

like image 239
avocado Avatar asked Oct 18 '22 11:10

avocado


1 Answers

This looks like a variation of Spark-1199 and is likely a REPL bug.

This yields expected behavior when running locally inside IDEA:

import org.apache.spark.SparkContext

class TypeA extends Serializable
case class TypeB(id:Long) extends TypeA
case class TypeC(name:String) extends TypeA

val sc = new SparkContext("local[*]", "swe")
val rdd = sc.parallelize(List(TypeB(12), TypeC("Hsa")))

rdd.filter { case x: TypeB => true; case _ => false }.count()

Yields:

import org.apache.spark.SparkContext

defined class TypeA
defined class TypeB
defined class TypeC   

sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@10a1410d
rdd: org.apache.spark.rdd.RDD[TypeA with Product] = ParallelCollectionRDD[0] at parallelize at <console>:18

[Stage 0:>....... (0 + 0) / 4]
res0: Long = 1
like image 137
Yuval Itzchakov Avatar answered Nov 15 '22 09:11

Yuval Itzchakov