Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to replace values in RDD 1 per keys in RDD 2?

Here are two RDDs.

Table1-pair(key,value)

val table1 = sc.parallelize(Seq(("1", "a"), ("2", "b"), ("3", "c")))  
//RDD[(String, String)]

Table2-Arrays

val table2 = sc.parallelize(Array(Array("1", "2", "d"), Array("1", "3", "e")))  
//RDD[Array[String]]

I am trying to change elements of table2 such as "1" to "a" using the keys and values in table1. My expect result is as follows:

RDD[Array[String]] = (Array(Array("a", "b", "d"), Array("a", "c", "e")))

enter image description here

Is there a way to make this possible?

If so, would it be efficient using a huge dataset?

like image 359
S.Kang Avatar asked Sep 28 '17 14:09

S.Kang


3 Answers

I think we can do it better with dataframes while avoiding joins as it might involve shuffling of data.

val table1 = spark.sparkContext.parallelize(Seq(("1", "a"), ("2", "b"), ("3", "c"))).collectAsMap()
//Brodcasting so that mapping is available to all nodes
val brodcastedMapping = spark.sparkContext.broadcast(table1)
val table2 = spark.sparkContext.parallelize(Array(Array("1", "2", "d"), Array("1", "3", "e")))

def changeMapping(value: String): String = {
  brodcastedMapping.value.getOrElse(value, value)
}
val changeMappingUDF = udf(changeMapping(_:String))
table2.toDF.withColumn("exploded", explode($"value"))
  .withColumn("new", changeMappingUDF($"exploded"))
  .groupBy("value")
  .agg(collect_list("new").as("mappedCol"))
  .select("mappedCol").rdd.map(r => r.toSeq.toArray.map(_.toString))

Let me know if it suits your requirement otherwise I can modify it as needed.

like image 71
nareshbabral Avatar answered Nov 17 '22 20:11

nareshbabral


You can do that in Dataset

package dataframe

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
/**
 * @author [email protected]
 */
object Test {

  case class table1Class(key: String, value: String)
  case class table2Class(key: String, value: String, value1: String)
  def main(args: Array[String]) {
    val spark =
      SparkSession.builder()
        .appName("DataFrame-Basic")
        .master("local[4]")
        .getOrCreate()

    import spark.implicits._
    //
    val table1 = Seq(
      table1Class("1", "a"), table1Class("2", "b"), table1Class("3", "c"))

    val df1 = spark.sparkContext.parallelize(table1, 4).toDF()

    df1.show()

    val table2 = Seq(
      table2Class("1", "2", "d"), table2Class("1", "3", "e"))

    val df2 = spark.sparkContext.parallelize(table2, 4).toDF()

    df2.show()
   //
    df1.createOrReplaceTempView("A")
    df2.createOrReplaceTempView("B")

    spark.sql("select d1.key,d1.value,d2.value1  from A d1  inner join B d2 on d1.key = d2.key").show()
 //TODO
 /* need to fix query
    spark.sql( "select * from (  "+ //B1.value,B1.value1,A.value
                     " select A.value,B.value,B.value1 "+
                           " from B "+
                                " left join A "+
                                     " on B.key = A.key ) B2 "+
                                             " left join A " +
                                                 " on B2.value = A.key" ).show()

          */                                       
  }
}

Results :

+---+-----+
|key|value|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+-----+

+---+-----+------+
|key|value|value1|
+---+-----+------+
|  1|    2|     d|
|  1|    3|     e|
+---+-----+------+


[Stage 15:=====================================>                 (68 + 6) / 100]
[Stage 15:============================================>          (80 + 4) / 100]




    +-----+-----+------+
    |value|value|value1|
    +-----+-----+------+
    |    1|    a|     d|
    |    1|    a|     e|
    +-----+-----+------+
like image 34
vaquar khan Avatar answered Nov 17 '22 22:11

vaquar khan


Is there a way to make this possible?

Yes. Use Datasets (not RDDs as less effective and expressive), join them together and select fields of your liking.

val table1 = Seq(("1", "a"), ("2", "b"), ("3", "c")).toDF("key", "value")
scala> table1.show
+---+-----+
|key|value|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+-----+

val table2 = sc.parallelize(
  Array(Array("1", "2", "d"), Array("1", "3", "e"))).
  toDF("a").
  select($"a"(0) as "a0", $"a"(1) as "a1", $"a"(2) as "a2")
scala> table2.show
+---+---+---+
| a0| a1| a2|
+---+---+---+
|  1|  2|  d|
|  1|  3|  e|
+---+---+---+

scala> table2.join(table1, $"key" === $"a0").select($"value" as "a0", $"a1", $"a2").show
+---+---+---+
| a0| a1| a2|
+---+---+---+
|  a|  2|  d|
|  a|  3|  e|
+---+---+---+

Repeat for the other a columns and union together. While repeating the code, you'll notice the pattern that will make the code generic.

If so, would it be efficient using a huge dataset?

Yes (again). We're talking Spark here and a huge dataset is exactly why you chose Spark, isn't it?

like image 1
Jacek Laskowski Avatar answered Nov 17 '22 21:11

Jacek Laskowski