I have a dataset which looks like this, where each user and product ID is a string:
userA, productX
userA, productX
userB, productY
with ~2.8 million products and 300 million users; about 2.1 billion user-product associations.
My end goal is to run Spark collaborative filtering (ALS) on this dataset. Since it takes int keys for users and products, my first step is to assign a unique int to each user and product, and transform the dataset above so that users and products are represented by ints.
Here's what I've tried so far:
val rawInputData = sc.textFile(params.inputPath)
.filter { line => !(line contains "\\N") }
.map { line =>
val parts = line.split("\t")
(parts(0), parts(1)) // user, product
}
// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()
// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()
idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)
// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
case (id1, (id2s, idx1s)) =>
val idx1 = idx1s.head
id2s.map { (_, idx1)
}
}
// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
case (id2, (idx1s, idx2s)) =>
val idx2 = idx2s.head
idx1s.map{ (_, idx2)
}
}
// save output
val convertedInts = converted.map{
case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)
When I try to run this on my cluster (40 executors with 5 GB RAM each), it's able to produce the idx1map and idx2map files fine, but it fails with out of memory errors and fetch failures at the first flatMap after cogroup. I haven't done much with Spark before so I'm wondering if there is a better way to accomplish this; I don't have a good idea of what steps in this job would be expensive. Certainly cogroup would require shuffling the whole data set across the network; but what does something like this mean?
FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)
The reason I'm not just using a hashing function is that I'd eventually like to run this on a much larger dataset (on the order of 1 billion products, 1 billion users, 35 billion associations), and number of Int key collisions would become quite large. Is running ALS on a dataset of that scale even close to feasible?
I looks like you are essentially collecting all lists of users, just to split them up again. Try just using join instead of cogroup, which seems to me to do more like what you want. For example:
import org.apache.spark.SparkContext._
// Create some fake data
val data = sc.parallelize(Seq(("userA", "productA"),("userA", "productB"),("userB", "productB")))
val userId = sc.parallelize(Seq(("userA",1),("userB",2)))
val productId = sc.parallelize(Seq(("productA",1),("productB",2)))
// Replace userName with ID's
val userReplaced = data.join(userId).map{case (_,(prod,user)) => (prod,user)}
// Replace product names with ID's
val bothReplaced = userReplaced.join(productId).map{case (_,(user,prod)) => (user,prod)}
// Check results:
bothReplaced.collect()) // Array((1,1), (1,2), (2,2))
Please drop a comments on how well it performs.
(I have no idea what FetchFailed(...)
means)
My platform version : CDH :5.7, Spark :1.6.0/StandAlone;
My Test Data Size:31815167 all data; 31562704 distinct user strings, 4140276 distinct product strings .
My first idea is to use collectAsMap action and then use the map idea to change the user/product string to int . With driver memory up to 12G , i got OOM or GC overhead exception (the exception is limited by driver memory).
But this idea can only use on a small data size, with bigger data size , you need a bigger driver memory .
Second idea : Second idea is to use join method, as Tobber proposaled. Here is some test result: Job setup:
I follow the steps:
The job take about 10 minutes to finish.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With