I'm running logistic regression with SGD on a large libsvm file. The file is about 10 GB in size with 40 million training examples.
When I run my scala code with spark-submit, I notice that spark spends a lot of time logging this:
18/02/07 04:44:50 INFO HadoopRDD: Input split: file:/ebs2/preprocess/xaa:234881024+33554432
18/02/07 04:44:51 INFO Executor: Finished task 6.0 in stage 1.0 (TID 7). 875 bytes result sent to driver
18/02/07 04:44:51 INFO TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, localhost, executor driver, partition 8, PROCESS_LOCAL, 7872 bytes)
18/02/07 04:44:51 INFO TaskSetManager: Finished task 6.0 in stage 1.0 (TID 7) in 1025 ms on localhost (executor driver) (7/307)
Why is Spark doing so many 'HadoopRDD: Input splits'? What's the purpose of that, and how do I go about speeding up or getting rid of this process?
Here is the code:
import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.optimization.L1Updater
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import scala.compat.Platform._
object test {
def main(args: Array[String]) {
val nnodes = 1
val epochs = 3
val conf = new SparkConf().setAppName("Test Name")
val sc = new SparkContext(conf)
val t0=currentTime
val train = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xaa", 262165, 4)
val test = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xab", 262165, 4)
val t1=currentTime;
println("START")
val lrAlg = new LogisticRegressionWithSGD()
lrAlg.optimizer.setMiniBatchFraction(10.0/40000000.0)
lrAlg.optimizer.setNumIterations(12000000)
lrAlg.optimizer.setStepSize(0.01)
val model = lrAlg.run(train)
model.clearThreshold()
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
}
}
I fixed the speed issues by running
train = train.coalesce(1)
train.cache()
and by increasing the memory to a total of 64 gigs. Previously Spark might not have been caching properly due to not enough RAM.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With