Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkContext not serializable inside a companion object

I'm currently trying to extend a Machine Learning application that uses Scala and Spark. I'm using the structure of a previous project from Dieterich Lawson that I found on Github

https://github.com/dieterichlawson/admm

This project basically uses SparkContext to build an RDD of blocks of training samples, and then perform local computations on each of these sets (for example solving a linear system).

I was following the same scheme, but for my local computation I need to perform a L-BFGS algorithm on each block of training samples. In order to do so, I wanted to use the L-BFGS algorithm from the mlLib which has the following signature.

runLBFGS(RDD<scala.Tuple2<Object,Vector>> data, Gradient gradient, 
         Updater updater, int numCorrections, double convergenceTol, 
         int maxNumIterations, double regParam, Vector initialWeights)

As it says, the method takes as input an RDD[Object,Vector] of the training samples. The problem is that locally on each worker I no longer keep the RDD structure of the data. Therefore, I'm trying to use parallelize function of the SparkContext on each block of the matrix. But when I do this, I get a serializer exception. (The exact exception message is at the end of the question).

This is a detailed explanation on how I'm handling the SparkContext.

First, in the main application it is used to open a textfile and it is used in the factory of the class LogRegressionXUpdate:

val A = sc.textFile("ds1.csv")
A.checkpoint
val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc)

In the application, the class LogRegressionXUpdate is implemented as follows

class LogRegressionXUpdate(val training: RDD[(Double, NV)],
                           val rho: Double) extends Function1[BDV[Double],Double] with Prox  with Serializable{

def prox(x: BDV[Double], rho: Double): BDV[Double] = {
    val numCorrections = 10
    val convergenceTol = 1e-4
    val maxNumIterations = 20
    val regParam = 0.1
    val (weights, loss) = LBFGS.runLBFGS(
        training,
        new GradientForLogRegADMM(rho,fromBreeze(x)),
        new SimpleUpdater(),
        numCorrections,
        convergenceTol,
        maxNumIterations,
        regParam,
        fromBreeze(x))
    toBreeze(weights.toArray).toDenseVector
}

def apply(x: BDV[Double]): Double = {
    Math.pow(1,2.0)
}

}

With the following companion object:

object LogRegressionXUpdate {
    def fromTextFile(file: RDD[String], rho: Double, blockHeight: Int = 1024, @transient sc: SparkContext): RDF[LogRegressionXUpdate] = {
        val fns = new BlockMatrix(file, blockHeight).blocks.
        map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho))
        new RDF[LogRegressionXUpdate](fns, 0L)
    }
}

This constructor is causing a serialization error though I'm not really needing the SparkContext to build each RDD locally. I've searched for solutions to this problem and adding @transient didn't solve it. Then, my question is: is it really possible to build these "second layer RDDs" or I'm forced to perform a non distributed version of the L-BFGS algorithm. Thanks in advance!

Error Log:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:293)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70)
at admm.examples.Lasso$.run(Lasso.scala:96)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:70)
at admm.examples.Lasso$$anonfun$main$1.apply(Lasso.scala:69)
at scala.Option.map(Option.scala:145)
at admm.examples.Lasso$.main(Lasso.scala:69)
at admm.examples.Lasso.main(Lasso.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@20576557)
- field (class: admm.functions.LogRegressionXUpdate$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
- object (class admm.functions.LogRegressionXUpdate$$anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more
like image 923
Gabriel M Avatar asked Jul 16 '15 07:07

Gabriel M


1 Answers

RDDs should only be accessed from the driver. Whenever you call something like

myRDD.map(someObject.someMethod)

spark serializes whatever that is needed for the computation of someMethod, and sends it to the workers. There, the method is deserialized and then it runs on each partition independently.

You, however, try to use a method that itself uses spark: you attempt to create a new RDD. However, this is not possible since they can only be created in the driver. The error you see is spark's attempt to serialize the spark context itself since it is needed for the computation at each block. More about serialization can be found in the first answer to this question.

"... though I'm not really needing the SparkContext to build each RDD locally" - actually this is exactly what you are doing when calling sc.parallelize. Bottom line - you need to find (or write) a local implementation of L-BFGS.

like image 73
Amir Avatar answered Nov 15 '22 05:11

Amir