Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS connection timeout when running Spark job on EMR

I'm trying to submit a simple spark job in an Amazon EMR cluster. My cluster has 5 M4.2xlarge instances (1 master, 4 slaves), each with 16 vCPU, and 32 gigs of memory.

This is my code:

def main(args : Array[String]): Unit = {
 val sparkConfig = new SparkConf()
  .set("hive.exec.dynamic.partition", "true")
  .set("hive.exec.dynamic.partition.mode", "nonstrict")
  .set("hive.s3.max-client-retries", "50")
  .set("hive.s3.max-error-retries", "50")
  .set("hive.s3.max-connections", "100")
  .set("hive.s3.connect-timeout", "5m")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired", "true")
  .set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock")
  .set("spark.broadcast.compress", "true")

 val spark = SparkSession.builder()
    .appName("Spark Hive Example")
    .enableHiveSupport()
    .config(sparkConfig)
    .getOrCreate()

// Set Kryo for serializing
GraphXUtils.registerKryoClasses(sparkConfig)
val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000")
val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String]))

val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000")
val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String]))

val graph = Graph(vertexRDD, edgesRDD)

val connectedComponents = graph.connectedComponents().vertices

Both table1, and table2 are S3 backed external tables on hive. When I run this program, my job fails with the following error:

Job aborted due to stage failure: Task 827 in stage 0.0 failed 4 times, most recent failure: Lost task 827.3 in stage 0.0 (TID 921, xxx.internal, executor 3): com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1069)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
    at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy35.retrieveMetadata(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1194)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:773)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
    at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:355)
    at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316)
    at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:237)
    at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1204)
    at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:246)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:245)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:203)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
    at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.get(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1190)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
    ... 59 more

Not sure if it is coming from hadoop or when reading from hive, but I saw a similar issue here, so I added the following params in my spark-submit command:

--conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60"

Still doesn't work. Does anyone know what's going on?

like image 460
drunkenfist Avatar asked Aug 31 '17 00:08

drunkenfist


People also ask

Can spark Mllib run on EMR?

You can install Spark on an Amazon EMR cluster along with other Hadoop applications, and it can also leverage the EMR file system (EMRFS) to directly access data in Amazon S3.

Can we run PySpark on EMR?

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.


2 Answers

TLDR: The property you need to set is fs.s3.maxConnections in the emrfs-site.xml configuration file. It defaults to 50. We were getting exactly the same error/stack trace as you, so I set it to 5000, which fixed the problem and had no ill effects.

From what I can tell, the root cause is InputFormat implementations that do not properly use try...finally to ensure that connections get closed when an exceptions are thrown. Notably, older versions of Hive, including v1.2.1 that Spark is compiled against, exhibit this bug. Hive 2.x massively refactors OrcInputFormat, though I haven't verified that the bug is fixed, nor do I know if/when/how you can compile Spark against Hive 2.x.

The workaround increases the size of the connection pool, as suggested in another answer, but both the property and its location are quite different than in the "classic" S3 filesystems (s3/s3a/s3n). Of course, this isn't documented anywhere and required decompilation of the emrfs jar to tease out...

like image 163
Jonathan Traupman Avatar answered Oct 11 '22 02:10

Jonathan Traupman


I don't use EMRFS, but I do know the other spark/hadoop S3 clients all use a pool of http connections for their requests to S3, and "timeout waiting for pool" messages invariably means "pool isn't big enough". See if you can find out what the emrfs options are for increasing that pool size. You will need at least one for every worker thread running in your process, and I'd double it in the hope that emrfs parallelises block uploads the way the s3a client does.

like image 33
stevel Avatar answered Oct 11 '22 00:10

stevel