Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Spark application on YARN fails with FetchFailedException due to Connection refused?

I am using spark version 1.6.3 and yarn version 2.7.1.2.3 comes with HDP-2.3.0.0-2557. Becuase, spark version is too old in the HDP version that I use, I prefer to use another spark as yarn mode remotely.

Here is how I run spark shell;

./spark-shell --master yarn-client

Everything seem fine, sparkContext is initialized, sqlContext is initialized. I can even access my hive tables. But in some cases, it is getting in trouble when it tries to connect to block managers.

I am not an expert but I think, that block managers while I run it on yarn mode, are running on my yarn cluster. It seemed a network problem to me for the first time and didn't want to ask it in here. But, this happens in some cases which I couldn't figure out yet. So it makes me think this may not be network problem.

Here is the code;

def df = sqlContext.sql("select * from city_table")

The codes below works fine;

df.limit(10).count()

But the size is more than 10, I don't know, this changes on every run;

df.count()

This raises an exception;

6/12/30 07:31:04 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 157 bytes
16/12/30 07:31:19 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 8, 172.27.247.204): FetchFailed(BlockManagerId(2, 172.27.247.204, 56093), shuffleId=2, mapId=0, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to /172.27.247.204:56093
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:504)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    ... 3 more
Caused by: java.net.ConnectException: Connection refused: /172.27.247.204:56093
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    ... 1 more

)

I could just realised that this happens when there are more than one task to shuffle.

What is the problem, is it a performance issue or another network issue that I couldn't see. What is that shuffling? If it is network issue, is it between my spark and yarn or, a problem in yarn itself?

Thank you.

Edited:

I just see something in the logs;

17/01/02 06:45:17 INFO DAGScheduler: Executor lost: 2 (epoch 13)
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
17/01/02 06:45:17 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, 172.27.247.204, 51809)
17/01/02 06:45:17 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
17/01/02 06:45:17 INFO YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/01/02 06:45:24 INFO BlockManagerMasterEndpoint: Registering block manager 172.27.247.204:51809 with 511.1 MB RAM, BlockManagerId(2, 172.27.247.204, 51809)

Sometimes, retrying it on another block manager works, But, because the maximum allowable number of times which is 4 as default is exceeded, it never ends most of the time.

Edited 2:

Yarn is really really silent about that, but I think this is Network issue, I could iterate the problem to somewhere;

This spark is deployed outside of the HDP environment. When spark submit an application to yarn, yarn informs the spark driver about the block manager and executors. Executors are data nodes in HDP cluster and have different IP in its private network. But, when it comes to informing spark driver at outside of the cluster, it gives same and always single IP for all executors. This is because all nodes in HDP cluster getting out over a router and with same IP. Assume that IP is 150.150.150.150, when spark driver needs to connect and ask something from that executors, it tries it with this IP. But this IP is actually outer IP address of whole cluster, not an individual data node IP.

Is there way to make yarn informs about the executors(Block Managers) with its private ip. Because, their private IP's are also accessible from the machine that this spark driver is working on.

like image 263
Ahmet DAL Avatar asked Dec 30 '16 07:12

Ahmet DAL


1 Answers

FetchFailedException exception is thrown when a reducer task (for a ShuffleDependency) could not fetch shuffle blocks. It usually means that the executor (with the BlockManager for the shuffle blocks) died and hence the exception:

Caused by: java.io.IOException: Failed to connect to /172.27.247.204:56093

The executor could OOMed (= OutOfMemoryError thrown) or YARN decided to kill it due to excessive memory usage.

You should review the logs of the Spark application using yarn logs command and find out the root cause of the issue.

yarn logs -applicationId <application ID> [options]

You could also review the status of your Spark application's executors in the Executors tab in web UI.

Spark usually recovers from a FetchFailedException by re-running the affected tasks. Use web UI to see how your Spark application performs. FetchFailedException could be due to a temporary memory "hiccup".

like image 139
Jacek Laskowski Avatar answered Oct 08 '22 21:10

Jacek Laskowski