Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging RDDs using Scala Apache Spark

I have 2 RDDs.

RDD1: ((String, String), Int)
RDD2: (String, Int)

For example:

    RDD1

    ((A, X), 1)
    ((B, X), 2)
    ((A, Y), 2)
    ((C, Y), 3)

    RDD2

    (A, 6)
    (B, 7)
    (C, 8)

Output Expected

    ((A, X), 6)
    ((B, X), 14)
    ((A, Y), 12)
    ((C, Y), 24)

In RDD1, (String, String) combination is unique and in RDD2, every string key is unique. The score of A from RDD2 (6) gets multiplied with all the score values of entries that have A in its key in RDD1.

14 = 7 * 2
12 = 6 * 2
24 = 8 * 3

I wrote the following but gives me an error on case:

val finalRdd = countRdd.join(countfileRdd).map(case (k, (ls, rs)) => (k, (ls * rs)))

Can someone help me out on this ?

like image 349
AngryPanda Avatar asked Feb 06 '26 20:02

AngryPanda


1 Answers

Your first RDD doesn't have the same key type as the second RDD(tuple (A, X) versus A). You should transform it before joining:

val rdd1  = sc.parallelize(List((("A", "X"), 1), (("A", "Y"), 2)))
val rdd2 = sc.parallelize(List(("A", 6)))
val rdd1Transformed = rdd1.map { 
   case ((letter, coord), value) => (letter, (coord, value)) 
}
val result = rdd1Transformed
  .join(rdd2)
  .map { 
    case (letter, ((coord, v1), v2)) => ((letter, coord), v1 * v2) 
  }
result.collect()
res1: Array[((String, String), Int)] = Array(((A,X),6), ((A,Y),12))
like image 165
Nikita Avatar answered Feb 09 '26 11:02

Nikita



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!