Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using stat.bloomFilter in Spark 2.0.0 to filter another dataframe

I have two large dataframes [a] one which has all events identified by an id [b] a list of ids. I want to filter [a] based on the ids in [b] using the stat.bloomFilter implementation in spark 2.0.0

However I don't see any operations in the dataset API to join the bloom filter to the data frame [a]

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")

val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val df2 = in2.map(x => (x)).toDF("c1")

val expectedNumItems: Long = 1000
val fpp: Double = 0.005

val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp)
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp)

What is the best way to filter 'df1' based on values in df2?

Thanks!

like image 979
Yash Avatar asked Feb 01 '17 20:02

Yash


3 Answers

You can use an UDF:

def might_contain(f: org.apache.spark.util.sketch.BloomFilter) = udf((x: Int) => 
  if(x != null) f.mightContain(x) else false)

df1.where(might_contain(sbf2)($"C1"))
like image 180
zero323 Avatar answered Nov 15 '22 03:11

zero323


I think I found the correct way to do this, but would still like pointers to see if there are better ways to manage this.

Here's my solution -

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5))
val d1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3")

val in2 = spark.sparkContext.parallelize(List(0, 1, 2))
val d2 = in2.map(x => (x)).toDF("c1")

val s2 = d2.stat.bloomFilter($"c1", expectedNumItems, fpp)

val a = spark.sparkContext.broadcast(s2)

val x = d1.rdd.filter(x => a.value.mightContain(x(0)))

case class newType(c1: Int, c2: Int, c3: Int) extends Serializable

val xDF = x.map(y => newType(y(0).toString.toInt, y(1).toString.toInt, y(2).toString.toInt)).toDF()

scala> d1.show(10)
+---+---+---+
| c1| c2| c3|
+---+---+---+
|  0|  1|  2|
|  1|  2|  3|
|  2|  3|  4|
|  3|  4|  5|
|  4|  5|  6|
|  5|  6|  7|
+---+---+---+

scala> d2.show(10)
+---+
| c1|
+---+
|  0|
|  1|
|  2|
+---+

scala> xDF.show(10)
+---+---+---+
| c1| c2| c3|
+---+---+---+
|  0|  1|  2|
|  1|  2|  3|
|  2|  3|  4|
+---+---+---+
like image 30
Yash Avatar answered Nov 15 '22 04:11

Yash


I built an implicit class that wraps https://stackoverflow.com/a/41989703/6723616 Comments welcome!

/**
  * Copyright 2017 Yahoo, Inc.
  * Zlib license: https://www.zlib.net/zlib_license.html
  */

package me.klotz.spark.utils

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.SparkContext

object BloomFilterEnhancedJoin {

  // not parameterized for field typel; assumes string
  /**
    *  Like .join(bigDF, smallDF, but accelerated with a Bloom filter.
    *  You pass in a size estimate of the bigDF, and a ratio of acceptable false positives out of the expected result set size.
    *  ratio=1 is a good start; that will result in about 50% false positives in the big-small join, so the filter accepts
    *  about as many as it passes, rather than rejecting almost all.  Pass in a size estimate of the big dataframe
    *  to avoid enumerating it.  The small DataFrame gets enumerated anyway.
    *  
    *  Example use:
    *  <code>
    *  import me.klotz.spark.utils.BloomFilterEnhancedJoin._
    *  val (dups_joined, bloomFilterBroadcast) = df_big.joinBloom(1024L*1024L*1024L, dups, 10.0, "id")
    *  dups_joined.write.format("orc").save("dups")
    *  bloomFilterBroadcast.unpersist
    *  <code>
    */
  implicit class BloomFilterEnhancedJoiner(bigdf:Dataset[Row]) {
    /**
      * You should call bloomFilterBroadcast.unpersist after
      */
    def joinBloom(bigDFCountEstimate:Long, smallDF: Dataset[Row], ratio:Double, field:String) = {
      val sc = smallDF.sparkSession.sparkContext
      val smallDFCount = smallDF.count
      val fpr = smallDFCount.toDouble / bigDFCountEstimate.toDouble / ratio
      println(s"fpr=${fpr} = smallDFCount=${smallDFCount} / bigDFCountEstimate=${bigDFCountEstimate} / ratio=${ratio}")

      val bloomFilterBroadcast = sc.broadcast((smallDF.stat.bloomFilter(field, smallDFCount, fpr)))
      val mightContain = udf((x: String) => if (x != null) bloomFilterBroadcast.value.mightContainString(x) else false)

      (bigdf.filter(mightContain(col(field))).join(smallDF, field), bloomFilterBroadcast)
    }
  }

}
like image 1
Leigh Klotz Avatar answered Nov 15 '22 04:11

Leigh Klotz