Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink job started from another program on YARN fails with "JobClientActor seems to have died"

I'm new flink user and I have the following problem. I use flink on YARN cluster to transfer related data extracted from RDBMS to HBase. I write flink batch application on java with multiple ExecutionEnvironments (one per RDB table to transfer table rows in parrallel) to transfer table by table sequentially (because call of env.execute() is blocking).

I start YARN session like this

export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 4 -d -jm 2048 -tm 8096

Then I run my application on YARN session started via shell script transfer.sh. Its content is here

#!/bin/bash

export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/flink run -p 4 transfer.jar

When I start this script from command line manually it works fine - jobs are submitted to YARN session one by one without errors.

Now I should be able to run this script from another java program. For this aim I use

Runtime.exec("transfer.sh");

(maybe are there better ways to do this? I have seen at REST API but there are some difficulties because job manager is proxied by YARN). At the beginning is works as usually - first several jobs are submitted to session and finished successfully. But the following jobs are not submitted to YARN session. In /opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log I see error (and no another errors found in DEBUG level)

The program execution failed: JobClientActor seems to have died before the JobExecutionResult could be retrieved.

I have tried to analyse this problem by myself and found out that this error has occurred in JobClient class while sending ping request with timeout to JobClientActor (i.e. YARN cluster). I tried to increase multiple heartbeat and timeout options like akka.*.timeout, akka.watch.heartbeat.* and yarn.heartbeat-delay options but it doesn't solve the problem - new jobs are not submit to YARN session from CliFrontend.

The environment for both case (manual call and call from another program) is the same. When I call

$ ps axu | grep transfer

it will give me output

/usr/lib/jvm/java-8-oracle/bin/java -Dlog.file=/opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log -Dlog4j.configuration=file:/opt/flink-1.3.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink-1.3.1/conf/logback.xml -classpath /opt/flink-1.3.1/lib/flink-metrics-graphite-1.3.1.jar:/opt/flink-1.3.1/lib/flink-python_2.11-1.3.1.jar:/opt/flink-1.3.1/lib/flink-shaded-hadoop2-uber-1.3.1.jar:/opt/flink-1.3.1/lib/log4j-1.2.17.jar:/opt/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.3.1/lib/flink-dist_2.11-1.3.1.jar:::/etc/hadoop/conf org.apache.flink.client.CliFrontend run -p 4 transfer.jar

I also tried to update flink to 1.4.0 release or change parallelism of job (even to -p 1) but error has still occurred.

I have no idea what could be different? Is any workaround by the way?

Thank you for any help.

like image 436
Vitaly Tsvetkoff Avatar asked Jan 16 '18 10:01

Vitaly Tsvetkoff


1 Answers

Finally I find out how to resolve that error Just replace Runtime.exec(...) with new ProcessBuilder(...).inheritIO().start().

I really don't know why the call of inheritIO helps in that case because as I understand it just redirects IO streams from child process to parent process. But I have checked that if I comment out this line of code the program begins to fall again.

like image 127
Vitaly Tsvetkoff Avatar answered Sep 29 '22 12:09

Vitaly Tsvetkoff