I would like to get a fast approximate set membership, based on a String-valued function applied to a large Spark RDD of String Vectors (~1B records). Basically the idea would be to reduce into a Bloom filter. This bloom filter could then be broadcasted to the workers for further use.
More specifically, I currently have
rdd: RDD[Vector[String]]
f: Vector[String] => String
val uniqueVals = rdd.map(f).distinct().collect()
val uv = sc.broadcast(uniqueVals)
But uniqueVals
is too large to be practical, and I would like to replace it with something of smaller (and known) size, i.e. a bloom filter.
My questions:
is it possible to reduce into a Bloom filter, or do I have to collect first, and then construct it in the driver?
is there a mature Scala/Java Bloom filter implementation available that would be suitable for this?
Yes, Bloom filters can be reduced, because they have some nice properties (they are monoids). This means you can do all the aggregation operations in parallel, doing effectively just one pass over the data to construct the BloomFilter for each partition and then reduce those BloomFilters together to get a single BloomFilter that you can query for contains
.
There are at least two implementations of BloomFilter in Scala and both seem mature projects (haven't actually used them in production). The first one is Breeze and the second one is Twitter's Algebird. Both contain implementations of different sketches and a lot more.
This is an example how to do that with Breeze:
import breeze.util.BloomFilter
val nums = List(1 to 20: _*).map(_.toString)
val rdd = sc.parallelize(nums, 5)
val bf = rdd.mapPartitions { iter =>
val bf = BloomFilter.optimallySized[String](10000, 0.001)
iter.foreach(i => bf += i)
Iterator(bf)
}.reduce(_ | _)
println(bf.contains("5")) // true
println(bf.contains("31")) // false
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With