Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

LinkedHashMap changes to HashMap and crashes in flink data stream operators

Given this dummy code:

 1 case class MyObject(values:mutable.LinkedHashMap[String, String])

...

 2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
 3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
 4
 5    val env = StreamExecutionEnvironment.getExecutionEnvironment
 6
 7    env
 8      .fromElements("one")
 9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })

The application will crashin line 18 with the exception:

Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap

The issues seems to be that through serialization/deserialization the values member changes its object type, or in other words, LinkedHashMap turns into HashMap.

Note that the same code as in line 18 works perfectly in line 12.

When setting a breakpoint to line 12, obj.values will be shown as LinkedHashMap by the debugger/IntelliJ, however a breakpoint in line 18 will show obj.values as HashMap in the debugger.

What is going on here? How can I fix this? After all, LinkedHashMap implements Serializable?!

like image 557
user826955 Avatar asked Oct 27 '25 05:10

user826955


1 Answers

The default Kryo Chill serializer for LinkedHashMap does not preserve the map type and instead deserializes the data into a HashMap. In order to avoid this, one needs to create a serializer for the LinkedHashMap type:

class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
  override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
    kryo.writeObject(output, `object`.size)

    for (elem <- `object`.iterator) {
      kryo.writeClassAndObject(output, elem._1)
      kryo.writeClassAndObject(output, elem._2)
    }
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
    val result = new mutable.LinkedHashMap[K, V]()
    val size = kryo.readObject(input, classOf[Int])
    for (_ <- 1 to size) {
      val key = kryo.readClassAndObject(input).asInstanceOf[K]
      val value = kryo.readClassAndObject(input).asInstanceOf[V]
      result.put(key, value)
    }

    result
  }
}

And then register it as a Kryo Serializer:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
like image 152
Till Rohrmann Avatar answered Oct 29 '25 18:10

Till Rohrmann



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!