Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink slot removed exception

Tags:

apache-flink

I am getting the following exception

org.apache.flink.util.FlinkException: The assigned slot container_1546939492951_0001_01_003659_0 was removed.
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlot(SlotManager.java:789)
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.removeSlots(SlotManager.java:759)
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.internalUnregisterTaskManager(SlotManager.java:951)
at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.unregisterTaskManager(SlotManager.java:372)
at org.apache.flink.runtime.resourcemanager.ResourceManager.closeTaskManagerConnection(ResourceManager.java:823)
at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:346)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

when running a batch process involving joining two very large datasets.

Here is what I can see in the overview. The failure happened on a task manager which did not get any inputs. Weirdly the previous set (partition -> flat map -> map) did not send anything to that task manager despite having a rebalance in front.

Am running it on EMR. I see that there is a slot.idle.timeout, would that have an effect and if so how do I specify it for that job? Can it be done on the command line?

enter image description here

like image 345
Julien Nioche Avatar asked Jan 08 '19 15:01

Julien Nioche


2 Answers

it's possible that this is a timeout issue, but usually when this happens to me it's because there's a failure (e.g. YARN kills the container because it's running beyond pmem or vmem limits). I'd recommend carefully checking the JobManager and all TaskManager log files.

like image 52
kkrugler Avatar answered Sep 30 '22 21:09

kkrugler


You can add the following line in java code.

env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

then your job will start on cancellation automatically.

like image 23
Aarya Avatar answered Sep 30 '22 22:09

Aarya