Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Counting co-occurrence - Algorithm for efficient multi-pass filtering of huge collections

There is a table with two columns books and readers of these books, where books and readers are book and reader IDs, respectively :

   books readers
1:     1      30
2:     2      10
3:     3      20
4:     1      20
5:     1      10
6:     2      30

Record book = 1, reader = 30 means that book with id = 1 was read by user with id = 30. For each book pair I need to count number of readers who read both of these books, with this algorithm:

for each book
  for each reader of the book
    for each other_book in books of the reader
      increment common_reader_count ((book, other_book), cnt)

The advantage of using this algorithm is that it requires a small number of operations compared to counting all book combinations by two.

To implement the above algorithm I organize this data in two groups : 1) keyed by book, an RDD containing readers of each book and 2) keyed by reader, an RDD containing books read by each reader, such as in the following program:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object Small {

  case class Book(book: Int, reader: Int)
  case class BookPair(book1: Int, book2: Int, cnt:Int)

  val recs = Array(
    Book(book = 1, reader = 30),
    Book(book = 2, reader = 10),
    Book(book = 3, reader = 20),
    Book(book = 1, reader = 20),
    Book(book = 1, reader = 10),
    Book(book = 2, reader = 30))

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setAppName("Test")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val data = sc.parallelize(recs)

    val bookMap = data.map(r => (r.book, r))
    val bookGrps = bookMap.groupByKey

    val readerMap = data.map(r => (r.reader, r))
    val readerGrps = readerMap.groupByKey

    // *** Calculate book pairs
    // Iterate book groups 
    val allBookPairs = bookGrps.map(bookGrp => bookGrp match {
      case (book, recIter) =>
        // Iterate user groups 
        recIter.toList.map(rec => {
          // Find readers for this book
          val aReader = rec.reader
          // Find all books (including this one) that this reader read
          val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match {
            case (reader2, recIter2) => reader2 == aReader
          })
          val bookPairs = allReaderBooks.map(readerTuple => readerTuple match {
            case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1))
          })
          bookPairs
        })

    })
    val x = allBookPairs.flatMap(identity)
    val y = x.map(rdd => rdd.first)
    val z = y.flatMap(identity)
    val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2)
    val result = p.map(bookPair => bookPair match {
      case((book1, book2),cnt) => BookPair(book1, book2, cnt)
    } )

    val resultCsv = result.map(pair => resultToStr(pair))
    resultCsv.saveAsTextFile("./result.csv")
  }

   def resultToStr(pair: BookPair): String = {
     val sep = "|"
    pair.book1 + sep + pair.book2 + sep + pair.cnt
  }
}

This implemntation in fact results in the different, inefficient algorithm !:

for each book
  find each reader of the book scanning all readers every time!
    for each other_book in books of the reader
      increment common_reader_count ((book, other_book), cnt)

which contradicts the main goal of the discussed above algorithm because instead of reducing, it increases the number of operations. Finding user books requires filtering all users for every book. Thus number of operations ~ N * M where N - number of users and M - number of books.

Questions:

  1. Is there any way to implement the original algorithm in Spark without filtering complete reader collection for every book?
  2. Any other algorithms to compute book pair counts efficiently?
  3. Also, when actually running this code I get filter exception which reason I can not figure out. Any ideas?

Please, see exception log below:

15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/29 18:24:10 INFO Remoting: Starting remoting
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:38910]
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:38910]
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4)
java.lang.NullPointerException
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58)
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at Small$$anonfun$4.apply(Small.scala:54)
    at Small$$anonfun$4.apply(Small.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
    at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

Update:

This code:

val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers")
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"), 
$"readers" === $"r_readers" and $"books" < $"r_books"
)
.groupBy($"books", $"r_books")
.agg($"books", $"r_books", count($"readers"))

Gives the following result:

books r_books COUNT(readers)
1     2       2     

So COUNT here is a number of times two books (here 1 and 2) were read together (count of pairs).

like image 695
zork Avatar asked May 29 '15 16:05

zork


1 Answers

This kind of thing is a lot easier if you convert the original RDD to a DataFrame:

val df = sc.parallelize(
  Array((1,30),(2,10),(3,20),(1,10), (2,30))
).toDF("books","readers")

Once you do that, just do a self-join on the DataFrame to make book pairs, then count how many readers have read each book pair:

val results = df.join(
  df.select($"books" as "r_books", $"readers" as "r_readers"), 
  $"readers" === $"r_readers" and $"books" < $"r_books"
).groupBy(
  $"books", $"r_books"
).agg(
  $"books", $"r_books", count($"readers")
)

As for additional explanation about that join, note that I am joining df back onto itself -- a self-join: df.join(df.select(...), ...). What you are looking to do is to stitch together book #1 -- $"books" -- with a second book -- $"r_books", from the same reader -- $"reader" === $"r_reader". But if you joined only with $"reader" === $"r_reader", you would get the same book joined back onto itself. Instead, I use $"books" < $"r_books" to ensure that the ordering in the book pairs is always (<lower_id>,<higher_id>).

Once you do the join, you get a DataFrame with a line for every reader of every book pair. The groupBy and agg functions do the actual counting of the number of readers per book pairing.

Incidentally, if a reader read the same book twice, I believe you would end up with a double-counting, which may or may not be what you want. If that's not what you want just change count($"readers") to countDistinct($"readers").

If you want to know more about the agg functions count() and countDistinct() and a bunch of other fun stuff, check out the scaladoc for org.apache.spark.sql.functions

like image 141
David Griffin Avatar answered Sep 19 '22 23:09

David Griffin