Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to perform "Lookup" operation on Spark dataframes given multiple conditions

I'm a newbie on Spark (My version is 1.6.0) and now I'm trying to solve the problem given below:

Suppose there are two source files:

  • The first one (A for short) is a big one which contains columns named A1, B1, C1 and other 80 columns. There are 230K records inside.
  • The second one (B for short) is a small lookup table which contains columns named A2, B2, C2 and D2. There are 250 records inside.

Now we need to insert a new column into A, given the logic below:

  • First lookup A1, B1 and C1 in B (corresponding columns are A2, B2 and C2), if succeeds, return D2 as the value of the new added column. If nothing found...
  • Then lookup A1, B1 in B. If successful, return D2. If nothing found...
  • Set the default value "NA"

I have already read in the files and converted them into data frames. For the first situation, I got the result by left outer joining them together. But I cannot find good way in the next step.

My current trying is to build a new data frame by joining A and B using a less strict condition. However I've no clue how to update the current data frame from the other one. Or is there any other more intuitive and efficient way to tackle the whole problem?

Thanks for all the answers.

-----------------------------Update on 20160309--------------------------------

Finally accepted @mlk 's answer. Still great thanks to @zero323 for his/her great comments on UDF versus join, the Tungsten code generation is really another problem we are facing now. But since we need to do scores of lookup and average 4 conditions for every lookup, the former solution is more suitable...

The final solution is somehow looks like below snippet:

```
import sqlContext.implicits._
import com.github.marklister.collections.io._

case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
  (aStr: String, bStr: String, cStr: String) =>
    tableBroadcast.value.find {
      case TableType(a, b, c, _) =>
        (a == aStr && b == bStr && c == cStr) ||
        (a == aStr && b == bStr)
    }.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
like image 650
TX Shi Avatar asked Mar 01 '16 09:03

TX Shi


People also ask

How do you do a lookup in PySpark?

PySpark RDD – lookup()lookup() is an action in pair RDD, which is used to return all the values that are associated with a key in a list. It is performed on single pair RDD. It takes a key as a parameter.

When to use Broadcast variable in Spark?

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

How to get Broadcast variable in Spark?

How to create Broadcast variable. The Spark Broadcast is created using the broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.

How does group by key work in spark?

In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.


2 Answers

As B is small I think the best way to do this would be a broadcast variable and user defined function.

// However you get the data...
case class BType( A2: Int, B2: Int, C2 : Int, D2 : String)
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200"))

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER")


// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 }

// Use the UDF in a select
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show
like image 143
Michael Lloyd Lee mlk Avatar answered Sep 30 '22 19:09

Michael Lloyd Lee mlk


Just for reference a solution without UDFs:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1"))
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2"))

// Match A, B and C
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1")
// Match A and B mismatch C
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2")

val toDrop = b1.columns ++ b2.columns

toDrop.foldLeft(a
  .join(b1, expr1, "leftouter")
  .join(b2, expr2, "leftouter")
  // If there is match on A, B, C then D_1 should be not NULL
  // otherwise we fall-back to D_2 
  .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c))

This assumes there is at most one match in each category (all three columns, or the first two) or duplicate rows in the output are desired.

UDF vs JOIN:

There are multiple factors to consider and there is no simple answer here:

Cons:

  • broadcast joins require passing data twice to the worker nodes. As for now broadcasted tables are not cached (SPARK-3863) and it is unlikely to change in the nearest future (Resolution: Later).
  • join operation is applied twice even if there is a full match.

Pros:

  • join and coalesce are transparent to the optimizer while UDFs are not.
  • operating directly with SQL expressions can benefit from all the Tungsten optimizations including code generation while UDF cannot.
like image 40
zero323 Avatar answered Sep 30 '22 19:09

zero323