Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: network errors between executors

I'm running Apache Spark 1.3.1 on Scala 2.11.2, and when running on an HPC cluster with large enough data, I get numerous errors like the ones at the bottom of my post (repeated multiple times per second, until the job gets killed for being over time). Based on the errors, the executor is attempting to get shuffle data from other nodes but is unable to do so.

This same program executes fine with either (a) a smaller amount of data, or (b) in local-only mode, so it has something to do with the data getting sent over the network (and isn't triggered with a very small amount of data).

The code that is being executed around the time this happens is as follows:

val partitioned_data = data  // data was read as sc.textFile(inputFile)
  .zipWithIndex.map(x => (x._2, x._1))
  .partitionBy(partitioner)  // A custom partitioner
  .map(_._2)

// Force previous lazy operations to be evaluated. Presumably adds some
// overhead, but hopefully the minimum possible...
// Suggested on Spark user list: http://apache-spark-user-list.1001560.n3.nabble.com/Forcing-RDD-computation-with-something-else-than-count-td707.html
sc.runJob(partitioned_data, (iter: Iterator[_]) => {})

Is this indicative of a bug, or is there something I'm doing wrong?

Here's a small snippet of the stderr log of one of the executors (full log is here):

15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=26501223, length=6227612}} to /10.0.0.5:41160; closing connection
java.io.IOException: Resource temporarily unavailable
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593000, chunkIndex=1}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/27/shuffle_0_5_0.data, offset=3792987, length=2862285}} to /10.0.0.5:41160; closing connection
java.nio.channels.ClosedChannelException
15/04/21 14:59:28 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1601401593002, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/tmp/spark-0f8d0598-b137-4d14-993a-568b2ab3709a/spark-12d5ff0a-2793-4b76-8a0b-d977a5924925/spark-7ad9382d-05cf-49d4-9a52-d42e6ca7117d/blockmgr-b72d4068-d065-47e6-8a10-867f723000db/15/shuffle_0_1_0.data, offset=0, length=10993212}} to /10.0.0.6:42426; closing connection
java.io.IOException: Resource temporarily unavailable
    at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
    at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:415)
    at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:516)
    at org.apache.spark.network.buffer.LazyFileRegion.transferTo(LazyFileRegion.java:96)
    at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:89)
    at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:237)
    at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:233)
    at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:264)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:707)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:315)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:676)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1059)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:669)
    at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
    at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:688)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:718)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:706)
    at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:741)
    at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:895)
    at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:240)
    at org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:147)
    at org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:119)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:95)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 WARN TransportChannelHandler: Exception in connection from node5.someuniversity.edu/10.0.0.5:60089
java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
    at sun.nio.ch.IOUtil.read(IOUtil.java:206)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
    at io.netty.buffer.PooledHeapByteBuf.setBytes(PooledHeapByteBuf.java:234)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:619)
15/04/21 14:59:28 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from node5.someuniversity.edu/10.0.0.5:60089 is closed
15/04/21 14:59:28 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 outstanding blocks after 5000 ms
like image 246
mjjohnson Avatar asked Apr 21 '15 19:04

mjjohnson


People also ask

What happens when executor fails in Spark?

If an executor runs into memory issues, it will fail the task and restart where the last task left off. If that task fails after 3 retries (4 attempts total by default) then that Stage will fail and cause the Spark job as a whole to fail.

What is Spark executor instances?

executor. instances acts as a minimum number of executors with a default value of 2. The minimum number of executors does not imply that the Spark application waits for the specific minimum number of executors to launch, before it starts. The specific minimum number of executors only applies to autoscaling.

How do I set the number of executors in a Spark?

Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15. So, Total available of cores in cluster = 15 x 10 = 150. Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30. Leaving 1 executor for ApplicationManager => --num-executors = 29.

What is Spark executor memory?

An executor is a process that is launched for a Spark application on a worker node. Each executor memory is the sum of yarn overhead memory and JVM Heap memory. JVM Heap memory comprises of: RDD Cache Memory. Shuffle Memory.


1 Answers

This appears to be a bug related to the Netty networking system (block transfer service), added in Spark 1.2. Adding .set("spark.shuffle.blockTransferService", "nio") to my SparkConf fixed the bug, so now everything works perfectly.

I found a post on the spark-user mailing list from someone that was running into similar errors, and they suggested trying nio instead of Netty.

SPARK-5085 is similar, in that changing from Netty to nio fixed their issue; however, they were also able to fix the issue by changing some networking settings. (I didn't try this yet myself, since I'm not sure I have the right access privileges to do so on the cluster.)

like image 74
mjjohnson Avatar answered Oct 07 '22 15:10

mjjohnson