I’m testing a Spark Streaming application with the help of "com.holdenkarau.spark-testing-base" and scalatest.
import com.holdenkarau.spark.testing.StreamingSuiteBase
import org.apache.spark.rdd.RDD
import org.scalatest.{ BeforeAndAfter, FunSuite }
class Test extends FunSuite with BeforeAndAfter with StreamingSuiteBase {
var delim: String = ","
before {
System.clearProperty("spark.driver.port")
}
test(“This Fails“) {
val source = scala.io.Source.fromURL(getClass.getResource(“/some_logs.csv"))
val input = source.getLines.toList
val rowRDDOut = Calculator.do(sc.parallelize(input)) //Returns DataFrame
val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + delim + row.getAs[String](1))
source.close
}
}
I get Serialization exception for field 'delim':
org.apache.spark.SparkException: Task not serializable
[info] at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
[info] at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
[info] at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
[info] at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
[info] at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
[info] at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[info] at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
[info] at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
[info] at org.apache.spark.rdd.RDD.map(RDD.scala:323)
[info] ...
[info] Cause: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
[info] Serialization stack:
[info] - object not serializable (class: org.scalatest.Assertions$AssertionsHelper, value: org.scalatest.Assertions$AssertionsHelper@78b339fa)
[info] - field (class: org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper)
If I replace 'delim' by String value in place, it works fine.
val report: RDD[String] = rowRDDOut.map(row => new String(row.getAs[String](0) + “,” + row.getAs[String](1))
What’s the difference between first and second version?
Thanks in advance!
The problem is not the type of delim
(String) it's delim
itself.
Try not to define variables outside your test()
methods. If you define delm
inside your test
it should work.
test(“This Fails“) {
val delim = ","
...
}
Now, you may ask why? Well, when you reference delim
from the outer scope, Scala will try to bring together the enclosing object class Test
. This object contains a reference to org.scalatest.Assertions$AssertionsHelper
that it's not Serializable (see your stacktrace).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With