Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink 1.4 AvroUtils error

I try to submit a job on Flink 1.4 and getting the following exception.

Any idea how to solve the problem?

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.VerifyError: Bad type on operand stack
Exception Details:
  Location:
    org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(Ljava/util/LinkedHashMap;)V @23: invokespecial
  Reason:
Type 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' (current frame, stack[7]) is not assignable to 'com/esotericsoftware/kryo/Serializer'
  Current Frame:
bci: @23
flags: { }
locals: { 'org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils', 'java/util/LinkedHashMap' }
stack: { 'java/util/LinkedHashMap', 'java/lang/String', uninitialized 6, uninitialized 6, 'java/lang/Class', uninitialized 12, uninitialized 12, 'org/apache/flink/api/java/typeutils/runtime/kryo/Serializers$SpecificInstanceCollectionSerializerForArrayList' }
   Bytecode:
0x0000000: 2b12 05b6 000b bb00 0c59 1205 bb00 0d59
0x0000010: bb00 0659 b700 0eb7 000f b700 10b6 0011
0x0000020: 57b1                                   

at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at org.apache.flink.api.java.typeutils.AvroUtils.getAvroUtils(AvroUtils.java:48)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:481)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:119)
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:90)
at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107)
at org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52)
at org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:102)
at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:253)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:520)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:165)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:692)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
like image 273
Nick Toker Avatar asked Dec 21 '17 15:12

Nick Toker


2 Answers

It's a conflict between the Kryo lib into the job jar and the one already loaded in Flink.

Indeed, Serializers$SpecificInstanceCollectionSerializerForArrayList inherits from com.esotericsoftware.kryo.Serializer through SpecificInstanceCollectionSerializer and CollectionSerializer. (see Serializers.java in Flink and CollectionSerializer.java in Kryo).

We fix it by excluding the Kryo lib from the job Jar. In Gradle build.gradle, it is done with (if you use ShadowJar plugin):

configurations {
  runtime.exclude module: 'kryo' , group: 'com.esotericsoftware.kryo'
}

In Maven pom.xml, it is done with (if you use shade plugin):

<exclude>com.esotericsoftware.kryo:kryo</exclude>
like image 184
toch Avatar answered Sep 22 '22 04:09

toch


thanks to "toch" on previous answer - he gave me a direction for the solution

<exclude>com.esotericsoftware.kryo:kryo</exclude> 

was already excluded in our pom.xml

I notice that kryo-shaded was included

I excluded by add to the pom

<exclude>com.esotericsoftware:kryo-shaded</exclude>

and it solve the problem

like image 28
Nick Toker Avatar answered Sep 26 '22 04:09

Nick Toker