Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Connection reset by peer while running Apache Spark Job

We have two HDP cluster's setup let's call them A and B.

CLUSTER A NODES :

  • It contains a total of 20 commodity machines.
  • There are 20 data nodes.
  • As namenode HA is configured, there is one active and one standby namenode.

CLUSTER B NODES :

  • It contains a total of 5 commodity machines.
  • There are 5 datanodes.
  • There is no HA configured and this cluster has one primary and one secondary namenode.

We have three major components in our application that perform an ETL (Extract, Transform and Load) operation on incoming files. I will refer to these components as E,T and L respectively.

COMPONENT E CHARACTERISTICS :

  • This component is an Apache Spark Job and it runs solely on Cluster B.
  • It's job is to pick up files from a NAS storage and put them into HDFS in cluster B.

COMPONENT T CHARACTERISTICS :

  • This component is also an Apache Spark Job and it runs on Cluster B.
  • It's job is to pick up the files in HDFS written by component E, transform them and then write the transformed files to HDFS in cluster A.

COMPONENT L CHARACTERISTICS :

  • This component is also an Apache Spark job and it runs solely on Cluster A.
  • It's job is to pick up files written by Component T and load the data to Hive tables present in Cluster A.

Component L is the gem among all of the three components and we have not faced any glitches in it. There were minor unexplained glitches in component E, but component T is the most troublesome one.

Component E and T both make use of DFS client to communicate to the namenode.

Following is an excerpt of the exception that we have observed intermittently while running component T :

clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
            at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
            at org.apache.hadoop.ipc.Client.call(Client.java:1459)
            at org.apache.hadoop.ipc.Client.call(Client.java:1392)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
            at com.sun.proxy.$Proxy15.complete(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
            at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
            at com.sun.proxy.$Proxy16.complete(Unknown Source)
            at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
            at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
            at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
            at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
            at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
            at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
            at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
            at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
            at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
            at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
            at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
            at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
            at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
            at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
            at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
            at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
            at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
            at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:197)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
            at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
            at java.io.DataInputStream.readInt(DataInputStream.java:387)
            at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)   

As mentioned, we face this exception very intermittently and when it does occur our application gets stuck causing us to restart it.

SOLUTIONS THAT WE TRIED :

  • Our first suspect was that we are overloading the active namenode in cluster A since component T does open a lot of DFS client in parallel and performs file operations on different files ( no issue of contention on same files ). In our effort to tackle this problem, we looked at two key parameters for the namenode dfs.namenode.handler.count and ipc.server.listen.queue.size and bumped the latter from 128 (default) to 1024.

  • Unfortunately, the issue still persisted in component T. We started taking a different approach on the problem. We focused solely on finding the reason for the occurrence of Connection Reset By Peer. According to a lot of articles and stack exchange discussions, the problem is described as follows, the RST flag has been set by the peer which results in an immediate termination of the connection. In our case we identified that the peer was the namenode of cluster A.

  • Keeping the RST flag in mind, I delved deep into understanding the internals of TCP communication only w.r.t. the reason of RST flag.

  • Every socket in Linux distributions ( not BSD ) has two queue's associated with it namely, the accept and the backlog queue.
  • During the TCP handshake process, all requests are kept in the backlog queue until ACK packets are received from the node that started to establish the connection. Once received, the request is transferred to the accept queue and the application that opened the socket can start receiving packets from the remote client.
  • The size of the backlog queue is controlled by two kernel level parameters namely net.ipv4.tcp_max_syn_backlog and net.core.somaxconn whereas the application ( namenode in our case ) can request the kernel for queue size that it desires limited by an upper bound ( we believe the accept queue size is the queue size defined by ipc.server.listen.queue.size ).
  • Also, another interesting thing to note here is that if the size of net.ipv4.tcp_max_syn_backlog is greater than net.core.somaxconn, then the value of the former is truncated to that of the latter. This claim is based upon Linux documentation and can be found at https://linux.die.net/man/2/listen.
  • Coming back to the point, when the backlog fills up completely TCP behaves in two manners and this behaviour can also be controlled by a kernel parameter called net.ipv4.tcp_abort_on_overflow. This is by default set to 0 and causes kernel to drop any new SYN packets when the backlog is full, which in turn lets the sender resend SYN packets. When set to 1, the kernel will mark the RST flag in a packet and send it to the sender thus abruptly terminating the connection.

  • We checked the value of the above mentioned kernel parameters and found out that net.core.somaxconn is set to 1024, net.ipv4.tcp_abort_on_overflow is set to 0 and net.ipv4.tcp_max_syn_backlog is set to 4096 across all the machines in both the clusters.

  • The only suspect that we have left now are the switches that connect Cluster A to Cluster B because none of the machines in any of the cluster will ever set the RST flag as the parameter net.ipv4.tcp_abort_on_overflow is set to 0.

MY QUESTIONS

  • It is evident from HDFS documentation that DFS Client uses RPC to communicate with the namenode for performing file operations. Does every RPC call involves the establishment of a TCP connection to namenode?
  • Does the parameter ipc.server.listen.queue.size define the length of accept queue of the socket at which namenode accepts RPC requests?
  • Can the namenode implicitly close connections to DFS client when under heavy load thus making the kernel to send a packet with RST flag being set, even if the kernel parameter net.ipv4.tcp_abort_on_overflow is set to 0?
  • Are L2 or L3 switches ( used for connecting the machines in our two clusters ) capable of setting the RST flag because they are not able to handle bursty traffics?

Our next approach to this problem is to identify which machine or switch ( there is no router involved ) is setting the RST flag by analyzing the packets using tcpdump or wireshark. We will also bump the size of all the queues mentioned above to 4096 in order to effectively handle bursty traffic.

The namenode logs show no sign of any exceptions except that the Namenode Connection Load as seen in Ambari peeked at certain points in time and not necessarily when the Connection Reset By Peer exception occured.

To conclude, I wanted to know whether or not we are headed on the right track to solve this problem or are we just going hit a dead end?

P.S. I apologize for the content's length in my question. I wanted to present the entire context to the readers before asking for any help or suggestions. Thank you for your patience.

like image 212
Aniketh Jain Avatar asked May 18 '17 22:05

Aniketh Jain


1 Answers

First of all, there may indeed be something odd in your network, and perhaps you will manage to track it down with the steps that you mention.

That being said, when looking at the steps there is something going on that I personally find counterintuitive.

You currently have step T doing the transformation, and the most fragile intra-cluster transport. Perhaps you are seeing worse reliability than people normally do, but I would seriously consider separating the complicated part and the fragile part.

If you do this (or simply split the work in smaller chunks) it should be fairly straightforward to design a solution that may see its fragile step step fail now and then, but will simply retry it when this happens. And of course the retry will come at minimal cost because only a small partion of work needs to be retried.


To conclude: It may help to troubleshoot your connection problems, but if possible at all, you may want to design for intermittant faillure instead.

like image 169
Dennis Jaheruddin Avatar answered Oct 24 '22 18:10

Dennis Jaheruddin