Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamic Set Algebra on Spark

Consider the following problem. Given:

  1. A collection of sets
  2. A boolean expression on them that is received dynamically

Return the resulting set.

Does Spark have any efficient algorithms or libraries to solve this general problem?

Here is a toy example to illustrate the problem conceptually:

val X  = Set("A1", "A2", "A3", "A4")
val Y  = Set("A2", "A4", "A5")

val collection = Set(X, Y)
val expression = "X and Y"

I am looking for a way of implementing a general solve_expression so that, in the example above:

output = solve_expression(expression, collection)

results in:

Set("A2", "A5")

I am working with sets with millions of items, and boolean expressions that come as strings. What's important is that each atom in the expression (e.g. "X", and "Y" above) are sets. The expressions and sets are dynamic (the operations can't be hard-coded, since we receive them as an input and we don't know what they are beforehand).

I am flexible with the representation of the problem. The actual sets can be of type Set, e.g. holding strings (e.g. "A1", "A2"), encoded as binary vectors, or anything else that makes this amenable to Spark.

Does Spark have any libraries to parse and solve general boolean expressions on sets?

like image 875
Amelio Vazquez-Reina Avatar asked Oct 30 '22 00:10

Amelio Vazquez-Reina


1 Answers

Alright. Lets suppose you want to do this in Spark. Furthermore, since these are giant sets, let's suppose they aren't in memory yet, they are each in a file - with each line in a file denoting an entry in the set.

We will represent sets with RDDs - Spark's standard way of storing data.

Using this parser (adapted and fixed from here)

import scala.util.parsing.combinator.JavaTokenParsers
import org.apache.spark.rdd.RDD

case class Query[T](setMap: Map[String, RDD[T]]) extends JavaTokenParsers {
  private lazy val expr: Parser[RDD[T]]
    = term ~ rep("union" ~ term) ^^ { case f1 ~ fs => (f1 /: fs)(_ union _._2) }
  private lazy val term: Parser[RDD[T]]
    = fact ~ rep("inter" ~ fact) ^^ { case f1 ~ fs => (f1 /: fs)(_ intersection _._2) }
  private lazy val fact: Parser[RDD[T]]
    = vari | ("(" ~ expr ~ ")" ^^ { case "(" ~ exp ~ ")" => exp })
  private lazy val vari: Parser[RDD[T]]
    = setMap.keysIterator.map(Parser(_)).reduceLeft(_ | _) ^^ setMap

  def apply(expression: String) = this.parseAll(expr, expression).get.distinct
}

Observe the following spark-shell interaction after having pasted the above into the shell (I've omitted some of the replies for brevity):

> val x = sc.textFile("X.txt").cache \\ contains "1\n2\n3\n4\n5"
> val y = sc.textFile("Y.txt").cache \\ contains "3\n4\n5\n6\n7"
> val z = sc.textFile("Z.txt").cache \\ contains "3\n9\n\10"
> val sets = Map("x" -> x, "y" -> y, "z" -> z)
> val query = Query[Int](sets)

Now, I can call query with different expressions. Notice that here I'm using collect to trigger evaluation (so we see what is inside the set), but if the sets are really big, you would normally just keep the RDD as is (and save it to disk).

> query("a union b").collect
res: Array[Int] = Array("1", "2", "3", "4", "5", "6", "7")
> query("a inter b").collect
res: Array[Int] = Array("3", "4", "5")
> query("a inter b union ((a inter b) union a)").collect
res: Array[Int] = Array("1", "2", "3", "4", "5")
> query("c union a inter b").collect
res: Array[Int] = Array("3", "4", "5", "9", "10")
> query("(c union a) inter b").collect
res: Array[Int] = Array("3", "4", "5")

Although I didn't bother to implement it, set difference should be a one-line addition (very similar to union and inter). I think set complements are a bad idea... they don't always make sense (what is the complement of the empty set, how do you represent it?).

like image 198
Alec Avatar answered Nov 09 '22 08:11

Alec