Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scalatest and Spark giving "java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper"

I’m testing a Spark Streaming application with the help of "com.holdenkarau.spark-testing-base" and scalatest.

import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }

class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {

  var delim: String = ","

  before {
    System.clearProperty("spark.driver.port")
   }

  test(“This Fails“) {

    val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
    val input = source.getLines.toList

    val rowRDDOut = Calculator.do(sc.parallelize(input))   //Returns DataFrame

    val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))

    source.close
  }
}

I get Serialization exception for field 'delim':

org.apache.spark.SparkException: Task not serializable
[info]   at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info]   at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info]   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info]   at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
[info]   at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info]   at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info]   at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info]   at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info]   ...
[info]   Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info]  - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info]  - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)

If I replace 'delim' by String value in place, it works fine.

val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))

What’s the difference between first and second version?

Thanks in advance!

like image 677
Tushar Sudake Avatar asked Feb 07 '17 03:02

Tushar Sudake


1 Answers

The problem is not the type of delim (String) it's delim itself.

Try not to define variables outside your test() methods. If you define delm inside your test it should work.

test(“This Fails“) {
   val delim = ","
   ...
}

Now, you may ask why? Well, when you reference delim from the outer scope, Scala will try to bring together the enclosing object class Test. This object contains a reference to org.scalatest.Assertions$AssertionsHelper that it's not Serializable (see your stacktrace).

like image 78
marios Avatar answered Oct 14 '22 22:10

marios