I am using IntelliJ Community Edition with Scala Plugin and spark libraries. I am still learning Spark and am using Scala Worksheet.
I have written the below code which removes punctuation marks in a String:
def removePunctuation(text: String): String = {
val punctPattern = "[^a-zA-Z0-9\\s]".r
punctPattern.replaceAllIn(text, "").toLowerCase
}
Then I read a text file and try to remove punctuation:
val myfile = sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)
This gives error as below, any help would be appreciated:
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(/home/ubuntu/src/main/scala/Test.sc:294) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(/home/ubuntu/src/main/scala/Test.sc:284) at org.apache.spark.util.ClosureCleaner$.clean(/home/ubuntu/src/main/scala/Test.sc:104) at org.apache.spark.SparkContext.clean(/home/ubuntu/src/main/scala/Test.sc:2090) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:366) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(/home/ubuntu/src/main/scala/Test.sc:365) at org.apache.spark.rdd.RDDOperationScope$.withScope(/home/ubuntu/src/main/scala/Test.sc:147) at #worksheet#.#worksheet#(/home/ubuntu/src/main/scala/Test.sc:108) Caused by: java.io.NotSerializableException: A$A21$A$A21 Serialization stack: - object not serializable (class: A$A21$A$A21, value: A$A21$A$A21@62db3891) - field (class: A$A21$A$A21$$anonfun$words$1, name: $outer, type: class A$A21$A$A21) - object (class A$A21$A$A21$$anonfun$words$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at A$A21$A$A21.words$lzycompute(Test.sc:27) at A$A21$A$A21.words(Test.sc:27) at A$A21$A$A21.get$$instance$$words(Test.sc:27) at A$A21$.main(Test.sc:73) at A$A21.main(Test.sc) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.jetbrains.plugins.scala.worksheet.MyWorksheetRunner.main(MyWorksheetRunner.java:22)
Serializing a Scala object for JSON storage means converting the object to a string and then writing it out to disk. Start by creating a case class and instantiating an object.
A non-serializable value is a complex object, like a class instance or a function. It is not an array, a plain serializable object, nor a primitive (like strings, numbers, booleans, null, etc.). Otherwise, it would be included in the list of the items that JSON supports.
Serialization refers to converting objects into a stream of bytes and vice-versa (de-serialization) in an optimal way to transfer it over nodes of network or store it in a file/memory buffer. Spark provides two serialization libraries and modes are supported and configured through spark. serializer property.
In Java, a NotSerializableException exception is thrown when an instance of a class must implement the Serializable interface. The exception is thrown by either the serialization runtime, or by the instance of the class. The argument for the NotSerializableException is the name of the class.
As T. Gaweda already pointed out, you're most likely defining your function in a class that's not serializable. Because it is a pure function, i.e. it doesn't depend on any context of the enclosing class, I suggest you put it into a companion object which should extend Serializable
. This would be Scala's equivalent of a Java static method:
object Helper extends Serializable {
def removePunctuation(text: String): String = {
val punctPattern = "[^a-zA-Z0-9\\s]".r
punctPattern.replaceAllIn(text, "").toLowerCase
}
}
As @TGaweda suggests, Spark's SerializationDebugger
is very helpful for identifying "the serialization path leading from the given object to the problematic object." All the dollar signs before the "Serialization stack" in the stack trace indicate that the container object for your method is the problem.
While it is easiest to just slap Serializable
on your container class, I prefer to take advantage of the fact Scala is a functional language and use your function as a first class citizen:
sc.textFile("/home/ubuntu/data.txt",4).map { text =>
val punctPattern = "[^a-zA-Z0-9\\s]".r
punctPattern.replaceAllIn(text, "").toLowerCase
}
Or if you really want to keep things separate:
val removePunctuation: String => String = (text: String) => {
val punctPattern = "[^a-zA-Z0-9\\s]".r
punctPattern.replaceAllIn(text, "").toLowerCase
}
sc.textFile("/home/ubuntu/data.txt",4).map(removePunctuation)
These options work of course since Regex
is serializable as you should confirm.
On a secondary but very important note, constructing a Regex
is expensive, so factor it out of your transformations for the sake of performance--possibly with a broadcast.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With