Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink throws Kryo error due to Avro array types

I'm getting the following error from the getProducedType method in my Flink deserializer:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

Deserializer:

class AvroDeserializer[T <: GenericRecord : ClassTag](topic: String, schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

  @transient lazy val keyDeserializer: KafkaAvroDeserializer = {
    val deserializer = new KafkaAvroDeserializer()
    deserializer.configure(
      Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
      true)
    deserializer
  }

  // Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
  @transient lazy val valueDeserializer: KafkaAvroDeserializer = {
    val deserializer = new KafkaAvroDeserializer()
    deserializer.configure(
      // other schema-registry configuration parameters can be passed, see the configure() code
      // for details (among other things, schema cache size)
      Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
      false)
    deserializer
  }

  override def deserialize(messageKey: Array[Byte], message: Array[Byte],
                           topic: String, partition: Int, offset: Long): T = {
    valueDeserializer.deserialize(topic, message).asInstanceOf[T]
  }

  override def isEndOfStream(nextElement: T): Boolean = false

  override def getProducedType: TypeInformation[T] = {
   TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
  }

}

From what I've read Kryo has some issues with array types which my message does have. If this is actually true then how can I deserialize my Kafka message into a GenericRecord?

like image 923
moku Avatar asked Jan 26 '26 15:01

moku


1 Answers

Had this problem myself earlier. This is due to the fact that the Kryo parser can't serialize avro types correctly.

To fix this, you can include the flink-avro library to your project as detailed here: Avro support in Flink

After doing this, Flink should now use a special parser for avro types automatically.

If this isn't the case, you might consider trying to set the option enableForceAvro() as described in Execution Configuration

like image 65
RemiM Avatar answered Jan 29 '26 13:01

RemiM



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!