Say for example, I have a simple case class
case class Foo(k:String, v1:String, v2:String)
Can I get spark to recognise this as a tuple for the purposes of something like this, without converting to a tuple in, say a map or keyBy step.
val rdd = sc.parallelize(List(Foo("k", "v1", "v2")))
// Swap values
rdd.mapValues(v => (v._2, v._1))
I don't even care if it looses the original case class after such an operation. I've tried the following with no luck. I'm fairly new to Scala, am I missing something?
case class Foo(k:String, v1:String, v2:String)
extends Tuple2[String, (String, String)](k, (v1, v2))
edit: In the above snippet the case class extends Tuple2, this does not produce the desired effect that the RDD class and functions do not treat it like a tuple and allow PairRDDFunctions, such as mapValues, values, reduceByKey, etc.
When creating a pair RDD from an in-memory collection in Scala and Python, we only need to call SparkContext. parallelize() on a collection of pairs. To create a pair RDD in Java from an in-memory collection, we instead use SparkContext. parallelizePairs().
Creating a pair RDD using the first word as the key in Java. PairFunction < String , String , String > keyData = new PairFunction < String , String , String >() { public Tuple2 < String , String > call ( String x ) { return new Tuple2 ( x . split ( " " )[ 0 ], x ); } }; JavaPairRDD < String , String > pairs = lines .
Pair RDD Functions Examples. distinct – Returns distinct keys. sortByKey – Transformation returns an RDD after sorting by key. reduceByKey – Transformation returns an RDD after adding value for each key. aggregateByKey – Transformation same as reduceByKey.
Unpaired RDDs consists of any type of objects. However, paired RDDs (key-value) attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements the key.
Extending TupleN
isn't a good idea for a number of reasons, with one of the best being the fact that it's deprecated, and on 2.11 it's not even possible to extend TupleN
with a case class. Even if you make your Foo
a non-case class, defining it on 2.11 with -deprecation
will show you this: "warning: inheritance from class Tuple2 in package scala is deprecated: Tuples will be made final in a future version.".
If what you care about is convenience of use and you don't mind the (almost certainly negligible) overhead of the conversion to a tuple, you can enrich a RDD[Foo]
with the syntax provided by PairRDDFunctions
with a conversion like this:
import org.apache.spark.rdd.{ PairRDDFunctions, RDD }
case class Foo(k: String, v1: String, v2: String)
implicit def fooToPairRDDFunctions[K, V]
(rdd: RDD[Foo]): PairRDDFunctions[String, (String, String)] =
new PairRDDFunctions(
rdd.map {
case Foo(k, v1, v2) => k -> (v1, v2)
}
)
And then:
scala> val rdd = sc.parallelize(List(Foo("a", "b", "c"), Foo("d", "e", "f")))
rdd: org.apache.spark.rdd.RDD[Foo] = ParallelCollectionRDD[6] at parallelize at <console>:34
scala> rdd.mapValues(_._1).first
res0: (String, String) = (a,b)
The reason your version with Foo
extending Tuple2[String, (String, String)]
doesn't work is that RDD.rddToPairRDDFunctions
targets an RDD[Tuple2[K, V]]
and RDD
isn't covariant in its type parameter, so an RDD[Foo]
isn't a RDD[Tuple2[K, V]]
. A simpler example might make this clearer:
case class Box[A](a: A)
class Foo(k: String, v: String) extends Tuple2[String, String](k, v)
class PairBoxFunctions(box: Box[(String, String)]) {
def pairValue: String = box.a._2
}
implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
new PairBoxFunctions(box)
And then:
scala> Box(("a", "b")).pairValue
res0: String = b
scala> Box(new Foo("a", "b")).pairValue
<console>:16: error: value pairValue is not a member of Box[Foo]
Box(new Foo("a", "b")).pairValue
^
But if you make Box
covariant…
case class Box[+A](a: A)
class Foo(k: String, v: String) extends Tuple2[String, String](k, v)
class PairBoxFunctions(box: Box[(String, String)]) {
def pairValue: String = box.a._2
}
implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
new PairBoxFunctions(box)
…everything's fine:
scala> Box(("a", "b")).pairValue
res0: String = b
scala> Box(new Foo("a", "b")).pairValue
res1: String = b
You can't make RDD
covariant, though, so defining your own implicit conversion to add the syntax is your best bet. Personally I'd probably choose to do the conversion explicitly, but this is a relatively un-horrible use of implicit conversions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With