There is a table with two columns books
and readers
of these books, where books
and readers
are book and reader IDs, respectively :
books readers
1: 1 30
2: 2 10
3: 3 20
4: 1 20
5: 1 10
6: 2 30
Record book = 1, reader = 30
means that book with id = 1
was read by user with id = 30
.
For each book pair I need to count number of readers who read both of these books, with this algorithm:
for each book
for each reader of the book
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)
The advantage of using this algorithm is that it requires a small number of operations compared to counting all book combinations by two.
To implement the above algorithm I organize this data in two groups : 1) keyed by book, an RDD containing readers of each book and 2) keyed by reader, an RDD containing books read by each reader, such as in the following program:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Small {
case class Book(book: Int, reader: Int)
case class BookPair(book1: Int, book2: Int, cnt:Int)
val recs = Array(
Book(book = 1, reader = 30),
Book(book = 2, reader = 10),
Book(book = 3, reader = 20),
Book(book = 1, reader = 20),
Book(book = 1, reader = 10),
Book(book = 2, reader = 30))
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val data = sc.parallelize(recs)
val bookMap = data.map(r => (r.book, r))
val bookGrps = bookMap.groupByKey
val readerMap = data.map(r => (r.reader, r))
val readerGrps = readerMap.groupByKey
// *** Calculate book pairs
// Iterate book groups
val allBookPairs = bookGrps.map(bookGrp => bookGrp match {
case (book, recIter) =>
// Iterate user groups
recIter.toList.map(rec => {
// Find readers for this book
val aReader = rec.reader
// Find all books (including this one) that this reader read
val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match {
case (reader2, recIter2) => reader2 == aReader
})
val bookPairs = allReaderBooks.map(readerTuple => readerTuple match {
case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1))
})
bookPairs
})
})
val x = allBookPairs.flatMap(identity)
val y = x.map(rdd => rdd.first)
val z = y.flatMap(identity)
val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2)
val result = p.map(bookPair => bookPair match {
case((book1, book2),cnt) => BookPair(book1, book2, cnt)
} )
val resultCsv = result.map(pair => resultToStr(pair))
resultCsv.saveAsTextFile("./result.csv")
}
def resultToStr(pair: BookPair): String = {
val sep = "|"
pair.book1 + sep + pair.book2 + sep + pair.cnt
}
}
This implemntation in fact results in the different, inefficient algorithm !:
for each book
find each reader of the book scanning all readers every time!
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)
which contradicts the main goal of the discussed above algorithm because instead of reducing, it increases the number of operations. Finding user books requires filtering all users for every book. Thus number of operations ~ N * M where N - number of users and M - number of books.
Questions:
filter exception
which reason I can not figure out. Any ideas? Please, see exception log below:
15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/29 18:24:10 INFO Remoting: Starting remoting
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:38910]
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:38910]
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58)
at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at Small$$anonfun$4.apply(Small.scala:54)
at Small$$anonfun$4.apply(Small.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Update:
This code:
val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers")
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"),
$"readers" === $"r_readers" and $"books" < $"r_books"
)
.groupBy($"books", $"r_books")
.agg($"books", $"r_books", count($"readers"))
Gives the following result:
books r_books COUNT(readers)
1 2 2
So COUNT
here is a number of times two books (here 1 and 2) were read together (count of pairs).
This kind of thing is a lot easier if you convert the original RDD to a DataFrame:
val df = sc.parallelize(
Array((1,30),(2,10),(3,20),(1,10), (2,30))
).toDF("books","readers")
Once you do that, just do a self-join on the DataFrame to make book pairs, then count how many readers have read each book pair:
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"),
$"readers" === $"r_readers" and $"books" < $"r_books"
).groupBy(
$"books", $"r_books"
).agg(
$"books", $"r_books", count($"readers")
)
As for additional explanation about that join, note that I am joining df
back onto itself -- a self-join: df.join(df.select(...), ...)
. What you are looking to do is to stitch together book #1 -- $"books"
-- with a second book -- $"r_books"
, from the same reader -- $"reader" === $"r_reader"
. But if you joined only with $"reader" === $"r_reader"
, you would get the same book joined back onto itself. Instead, I use $"books" < $"r_books"
to ensure that the ordering in the book pairs is always (<lower_id>,<higher_id>)
.
Once you do the join, you get a DataFrame with a line for every reader of every book pair. The groupBy
and agg
functions do the actual counting of the number of readers per book pairing.
Incidentally, if a reader read the same book twice, I believe you would end up with a double-counting, which may or may not be what you want. If that's not what you want just change count($"readers")
to countDistinct($"readers")
.
If you want to know more about the agg
functions count()
and countDistinct()
and a bunch of other fun stuff, check out the scaladoc for org.apache.spark.sql.functions
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With