We have two HDP cluster's setup let's call them A and B.
CLUSTER A NODES :
CLUSTER B NODES :
We have three major components in our application that perform an ETL (Extract, Transform and Load) operation on incoming files. I will refer to these components as E,T and L respectively.
COMPONENT E CHARACTERISTICS :
COMPONENT T CHARACTERISTICS :
COMPONENT L CHARACTERISTICS :
Component L is the gem among all of the three components and we have not faced any glitches in it. There were minor unexplained glitches in component E, but component T is the most troublesome one.
Component E and T both make use of DFS client to communicate to the namenode.
Following is an excerpt of the exception that we have observed intermittently while running component T :
clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
at org.apache.hadoop.ipc.Client.call(Client.java:1459)
at org.apache.hadoop.ipc.Client.call(Client.java:1392)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy15.complete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
at com.sun.proxy.$Proxy16.complete(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at java.io.FilterInputStream.read(FilterInputStream.java:133)
at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)
As mentioned, we face this exception very intermittently and when it does occur our application gets stuck causing us to restart it.
SOLUTIONS THAT WE TRIED :
Our first suspect was that we are overloading the active namenode in cluster A since component T does open a lot of DFS client in parallel and performs file operations on different files ( no issue of contention on same files ). In our effort to tackle this problem, we looked at two key parameters for the namenode dfs.namenode.handler.count and ipc.server.listen.queue.size and bumped the latter from 128 (default) to 1024.
Unfortunately, the issue still persisted in component T. We started taking a different approach on the problem. We focused solely on finding the reason for the occurrence of Connection Reset By Peer. According to a lot of articles and stack exchange discussions, the problem is described as follows, the RST flag has been set by the peer which results in an immediate termination of the connection. In our case we identified that the peer was the namenode of cluster A.
Keeping the RST flag in mind, I delved deep into understanding the internals of TCP communication only w.r.t. the reason of RST flag.
Coming back to the point, when the backlog fills up completely TCP behaves in two manners and this behaviour can also be controlled by a kernel parameter called net.ipv4.tcp_abort_on_overflow. This is by default set to 0 and causes kernel to drop any new SYN packets when the backlog is full, which in turn lets the sender resend SYN packets. When set to 1, the kernel will mark the RST flag in a packet and send it to the sender thus abruptly terminating the connection.
We checked the value of the above mentioned kernel parameters and found out that net.core.somaxconn is set to 1024, net.ipv4.tcp_abort_on_overflow is set to 0 and net.ipv4.tcp_max_syn_backlog is set to 4096 across all the machines in both the clusters.
The only suspect that we have left now are the switches that connect Cluster A to Cluster B because none of the machines in any of the cluster will ever set the RST flag as the parameter net.ipv4.tcp_abort_on_overflow is set to 0.
MY QUESTIONS
Our next approach to this problem is to identify which machine or switch ( there is no router involved ) is setting the RST flag by analyzing the packets using tcpdump or wireshark. We will also bump the size of all the queues mentioned above to 4096 in order to effectively handle bursty traffic.
The namenode logs show no sign of any exceptions except that the Namenode Connection Load as seen in Ambari peeked at certain points in time and not necessarily when the Connection Reset By Peer exception occured.
To conclude, I wanted to know whether or not we are headed on the right track to solve this problem or are we just going hit a dead end?
P.S. I apologize for the content's length in my question. I wanted to present the entire context to the readers before asking for any help or suggestions. Thank you for your patience.
First of all, there may indeed be something odd in your network, and perhaps you will manage to track it down with the steps that you mention.
That being said, when looking at the steps there is something going on that I personally find counterintuitive.
You currently have step T doing the transformation, and the most fragile intra-cluster transport. Perhaps you are seeing worse reliability than people normally do, but I would seriously consider separating the complicated part and the fragile part.
If you do this (or simply split the work in smaller chunks) it should be fairly straightforward to design a solution that may see its fragile step step fail now and then, but will simply retry it when this happens. And of course the retry will come at minimal cost because only a small partion of work needs to be retried.
To conclude: It may help to troubleshoot your connection problems, but if possible at all, you may want to design for intermittant faillure instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With