Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I put a case class in an rdd and have it act like a tuple(pair)?

Say for example, I have a simple case class

case class Foo(k:String, v1:String, v2:String)

Can I get spark to recognise this as a tuple for the purposes of something like this, without converting to a tuple in, say a map or keyBy step.

val rdd = sc.parallelize(List(Foo("k", "v1", "v2")))
// Swap values
rdd.mapValues(v => (v._2, v._1))

I don't even care if it looses the original case class after such an operation. I've tried the following with no luck. I'm fairly new to Scala, am I missing something?

case class Foo(k:String, v1:String, v2:String)
  extends Tuple2[String, (String, String)](k, (v1, v2))

edit: In the above snippet the case class extends Tuple2, this does not produce the desired effect that the RDD class and functions do not treat it like a tuple and allow PairRDDFunctions, such as mapValues, values, reduceByKey, etc.

like image 418
Mark Goodall Avatar asked Jan 20 '16 15:01

Mark Goodall


People also ask

How do you create a pair RDD consisting of a tuple?

When creating a pair RDD from an in-memory collection in Scala and Python, we only need to call SparkContext. parallelize() on a collection of pairs. To create a pair RDD in Java from an in-memory collection, we instead use SparkContext. parallelizePairs().

How do you make a paired RDD?

Creating a pair RDD using the first word as the key in Java. PairFunction < String , String , String > keyData = new PairFunction < String , String , String >() { public Tuple2 < String , String > call ( String x ) { return new Tuple2 ( x . split ( " " )[ 0 ], x ); } }; JavaPairRDD < String , String > pairs = lines .

What are pair RDD functions?

Pair RDD Functions Examples. distinct – Returns distinct keys. sortByKey – Transformation returns an RDD after sorting by key. reduceByKey – Transformation returns an RDD after adding value for each key. aggregateByKey – Transformation same as reduceByKey.

What is the difference between RDD and pair RDD?

Unpaired RDDs consists of any type of objects. However, paired RDDs (key-value) attains few special operations in it. Such as, distributed “shuffle” operations, grouping or aggregating the elements the key.


1 Answers

Extending TupleN isn't a good idea for a number of reasons, with one of the best being the fact that it's deprecated, and on 2.11 it's not even possible to extend TupleN with a case class. Even if you make your Foo a non-case class, defining it on 2.11 with -deprecation will show you this: "warning: inheritance from class Tuple2 in package scala is deprecated: Tuples will be made final in a future version.".

If what you care about is convenience of use and you don't mind the (almost certainly negligible) overhead of the conversion to a tuple, you can enrich a RDD[Foo] with the syntax provided by PairRDDFunctions with a conversion like this:

import org.apache.spark.rdd.{ PairRDDFunctions, RDD }

case class Foo(k: String, v1: String, v2: String)

implicit def fooToPairRDDFunctions[K, V]
  (rdd: RDD[Foo]): PairRDDFunctions[String, (String, String)] =
    new PairRDDFunctions(
      rdd.map {
        case Foo(k, v1, v2) => k -> (v1, v2)
      }
    )

And then:

scala> val rdd = sc.parallelize(List(Foo("a", "b", "c"), Foo("d", "e", "f")))
rdd: org.apache.spark.rdd.RDD[Foo] = ParallelCollectionRDD[6] at parallelize at <console>:34

scala> rdd.mapValues(_._1).first
res0: (String, String) = (a,b)

The reason your version with Foo extending Tuple2[String, (String, String)] doesn't work is that RDD.rddToPairRDDFunctions targets an RDD[Tuple2[K, V]] and RDD isn't covariant in its type parameter, so an RDD[Foo] isn't a RDD[Tuple2[K, V]]. A simpler example might make this clearer:

case class Box[A](a: A)

class Foo(k: String, v: String) extends Tuple2[String, String](k, v)

class PairBoxFunctions(box: Box[(String, String)]) {
  def pairValue: String = box.a._2
}

implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
  new PairBoxFunctions(box)

And then:

scala> Box(("a", "b")).pairValue
res0: String = b

scala> Box(new Foo("a", "b")).pairValue
<console>:16: error: value pairValue is not a member of Box[Foo]
       Box(new Foo("a", "b")).pairValue
                              ^

But if you make Box covariant…

case class Box[+A](a: A)

class Foo(k: String, v: String) extends Tuple2[String, String](k, v)

class PairBoxFunctions(box: Box[(String, String)]) {
  def pairValue: String = box.a._2
}

implicit def toPairBoxFunctions(box: Box[(String, String)]): PairBoxFunctions =
  new PairBoxFunctions(box)

…everything's fine:

scala> Box(("a", "b")).pairValue
res0: String = b

scala> Box(new Foo("a", "b")).pairValue
res1: String = b

You can't make RDD covariant, though, so defining your own implicit conversion to add the syntax is your best bet. Personally I'd probably choose to do the conversion explicitly, but this is a relatively un-horrible use of implicit conversions.

like image 54
Travis Brown Avatar answered Oct 08 '22 07:10

Travis Brown