Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Map can not be serializable in scala?

I am new to Scala. How come the "map" function is not serializable? How to make it serializable? For example, if my code is like below:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
  val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
  var res = List[Int]()
  while (iter.hasNext) {
    val cur = iter.next
    val a = lst.groupBy(x => x._1).mapValues(_.size)
    //val b= a.map(x => x._2)
    res = res ::: List(cur)
  }
  res.iterator
}

data.mapPartitions(myfunc).collect

If I uncomment the line

val b= a.map(x => x._2)

The code returns an exception:

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
Serialization stack:
    - object not serializable (class: scala.collection.immutable.MapLike$$anon$2, value: Map(1 -> 3))
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: a, type: interface scala.collection.immutable.Map)

Thank you very much.

like image 268
Carter Avatar asked Oct 02 '15 05:10

Carter


People also ask

Are Scala functions serializable?

No, a function is not necessary serializable... and serializable functions are unlikely to provide a human readable serialization.

Is Java Map serializable?

Serialization converts a Java object into a stream of bytes, which can be persisted or shared as needed. Java Maps are collections that map a key Object to a value Object, and are often the least intuitive objects to serialize.

What is serializable in Scala?

Serializing an object means taking the data stored in an object and converting it to bytes (or a string). Suppose you want to write the data in an object to a JSON file. JSON files store strings. JSON has no understanding about the JVM or Scala objects.

Is string serializable in Scala?

String does not implement scala. Serializable. There are no dependencies from Java to Scala.


3 Answers

It's well known scala bug: https://issues.scala-lang.org/browse/SI-7005 Map#mapValues is not serializable

We have this problem in our Spark apps, map(identity) solves the problem

rdd.groupBy(_.segment).mapValues(v => ...).map(identity)
like image 137
Eugene Zhulenev Avatar answered Oct 19 '22 12:10

Eugene Zhulenev


The actual implementation of the mapValues function is provided below and as you can see it is not serializable and creates only a view, not a proper existence of data and hence you are getting this error. Situation-wise mapValues can have many advantages.

protected class MappedValues[C](f: B => C) extends AbstractMap[A, C] with DefaultMap[A, C] {
    override def foreach[D](g: ((A, C)) => D): Unit = for ((k, v) <- self) g((k, f(v)))
    def iterator = for ((k, v) <- self.iterator) yield (k, f(v))
    override def size = self.size
    override def contains(key: A) = self.contains(key)
    def get(key: A) = self.get(key).map(f)
}
like image 4
Kshitij Kulshrestha Avatar answered Oct 19 '22 11:10

Kshitij Kulshrestha


Have you tried running this same code in an application? I suspect this is an issue with the spark shell. If you want to make it work in the spark shell then you might try wrapping the definition of myfunc and its application in curly braces like so:

val data = sc.parallelize(List(1,4,3,5,2,3,5))

val result = { 
  def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
    val lst = List(("a", 1),("b", 2),("c",3), ("a",2))
    var res = List[Int]()
    while (iter.hasNext) {
      val cur = iter.next
      val a = lst.groupBy(x => x._1).mapValues(_.size)
      val b= a.map(x => x._2)
      res = res ::: List(cur)
    }
    res.iterator
  }
  data.mapPartitions(myfunc).collect
}
like image 1
Jason Lenderman Avatar answered Oct 19 '22 10:10

Jason Lenderman