Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink fails on startup on Java 10. TaskManager: java.lang.ClassCastException: [B cannot be cast to [C

Background

When launching Flink, I get an immediate failure in the logs, and subsequent attempts to run an application all fail. I have narrowed down the failure to being Java 10 specific, possibly with other preconditions.

Repro

  1. Download and unpack Flink 1.4.2 from the official download page.
  2. Switch to Java 10. My specific version information is in (1).
  3. Start Flink with ./bin/start-local.sh.
  4. Observe the TaskManager logs. For example, cat ./log/flink-ming-taskmanager-0-C02TJ6XBG8WN.log. There will be a large amount of error output, as in (2).
  5. Run any application. For example, ./bin/flink run ~/git/ming-data-frameworks-poc/flinkpoc/target/scala-2.11/flinkpoc_2.11-0.1-SNAPSHOT.jar --port 900. My example application is in (3), but as far as I know, this happens with any application. It should fail with an error, as in (4)
  6. Stop Flink with ./bin/stop-local.sh.
  7. Empty the Flink logs directory.
  8. Switch to Java 8. My specific version information is in (5).
  9. Repeat steps 3 - 7.
  10. It should show a success message, as in (6). It should not fail with an error. The TaskManager logs should not contain the error output in step 4.

Questions

Is this a known incompatibility? It is not documented as such. In particular, the official quick start documentation says:

Flink runs on Linux, Mac OS X, and Windows. To be able to run Flink, the only requirement is to have a working Java 7.x (or higher) installation.

How can this issue be fixed or worked around on Java 10?

Is this a bug in Flink? Should it be reported upstream to the project?

Thank you in advance for any advice or help you may have.

Supplemental information

(1)

$ uname -a
Darwin C02TJ6XBG8WN 16.7.0 Darwin Kernel Version 16.7.0: Tue Jan 30 11:27:06 PST 2018; root:xnu-3789.73.11~1/RELEASE_X86_64 x86_64

$ java -version
java version "10.0.1" 2018-04-17
Java(TM) SE Runtime Environment 18.3 (build 10.0.1+10)
Java HotSpot(TM) 64-Bit Server VM 18.3 (build 10.0.1+10, mixed mode)

(2)

2018-05-11 17:19:05,353 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@localhost:6123] has failed, address is now gated for [5000] ms. Reason: [[B cannot be cast to [C] 
2018-05-11 17:19:10,428 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@localhost:6123] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2018-05-11 17:19:23,111 ERROR akka.remote.Remoting                                          - [B cannot be cast to [C
java.lang.ClassCastException: [B cannot be cast to [C
    at akka.remote.artery.FastHash$.ofString(LruBoundedCache.scala:18)
    at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:61)
    at akka.remote.serialization.ActorRefResolveCache.hash(ActorRefResolveCache.scala:55)
    at akka.remote.artery.LruBoundedCache.getOrCompute(LruBoundedCache.scala:110)
    at akka.remote.RemoteActorRefProvider.resolveActorRef(RemoteActorRefProvider.scala:403)
    at akka.actor.SerializedActorRef.readResolve(ActorRef.scala:433)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1250)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2087)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1585)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2346)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2240)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1585)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:328)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:328)
    at akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:156)
    at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:142)
    at scala.util.Try$.apply(Try.scala:192)
    at akka.serialization.Serialization.deserialize(Serialization.scala:136)
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:82)
    at akka.remote.EndpointReader$$anonfun$akka$remote$EndpointReader$$deliverAndAck$1.apply(Endpoint.scala:1047)
    at akka.remote.EndpointReader$$anonfun$akka$remote$EndpointReader$$deliverAndAck$1.apply(Endpoint.scala:1046)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at akka.remote.EndpointReader.akka$remote$EndpointReader$$deliverAndAck(Endpoint.scala:1046)
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:980)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

(3)

build.sbt

name := "flinkpoc"

version := "0.1-SNAPSHOT"

scalaVersion := "2.11.8"

val flinkVersion = "1.4.2"

libraryDependencies := Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided"
)

SimpleJob.scala

package flinkpoc

import org.apache.flink.streaming.api.scala._

object SimpleJob {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val inStream = env.fromCollection(Range(0, 100).toList)
    val outStream = inStream.map(_ * 100)
    outStream.print

    env.execute("flinkpoc.SimpleJob")
  }

}

(4)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
    at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
    at flinkpoc.SimpleJob$.main(SimpleJob.scala:14)
    at flinkpoc.SimpleJob.main(SimpleJob.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
    at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
    at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager.
    at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300)
    at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
    ... 20 more
Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager.
    at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219)
    at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
    at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

(5)

$ java -version
java version "1.8.0_172"
Java(TM) SE Runtime Environment (build 1.8.0_172-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)

(6)

Program execution finished
Job with JobID b41e26f012480d2842cbb8bde0098180 has finished.
Job Runtime: 379 ms
like image 595
Ming Avatar asked Mar 07 '23 05:03

Ming


1 Answers

The documentation is out of date. Flink 1.4 dropped support for Java 7, and Java versions 9 and 10 aren't supported yet. You'll need to use Java 8.

Update:

Flink-8033 is the Jira issue tracking Java 9 support.

like image 107
David Anderson Avatar answered Mar 08 '23 22:03

David Anderson