Given this dummy code:
1 case class MyObject(values:mutable.LinkedHashMap[String, String])
...
2 implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
3 implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
4
5 val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7 env
8 .fromElements("one")
9 .map(str =>
10 {
11 val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12 val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14 obj
15 })
16 .map(obj =>
17 {
18 val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20 obj
21 })
The application will crashin line 18 with the exception:
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap
The issues seems to be that through serialization/deserialization the values member changes its object type, or in other words, LinkedHashMap turns into HashMap.
Note that the same code as in line 18 works perfectly in line 12.
When setting a breakpoint to line 12, obj.values will be shown as LinkedHashMap by the debugger/IntelliJ, however a breakpoint in line 18 will show obj.values as HashMap in the debugger.
What is going on here? How can I fix this? After all, LinkedHashMap implements Serializable?!
The default Kryo Chill serializer for LinkedHashMap does not preserve the map type and instead deserializes the data into a HashMap. In order to avoid this, one needs to create a serializer for the LinkedHashMap type:
class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
kryo.writeObject(output, `object`.size)
for (elem <- `object`.iterator) {
kryo.writeClassAndObject(output, elem._1)
kryo.writeClassAndObject(output, elem._2)
}
}
override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
val result = new mutable.LinkedHashMap[K, V]()
val size = kryo.readObject(input, classOf[Int])
for (_ <- 1 to size) {
val key = kryo.readClassAndObject(input).asInstanceOf[K]
val value = kryo.readClassAndObject(input).asInstanceOf[V]
result.put(key, value)
}
result
}
}
And then register it as a Kryo Serializer:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With