Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark on Amazon EMR: "Timeout waiting for connection from pool"

I'm running a Spark job on a small three server Amazon EMR 5 (Spark 2.0) cluster. My job runs for an hour or so, fails with the error below. I can manually restart and it works, processes more data, and eventually fails again.

My Spark code is fairly simple and is not using any Amazon or S3 APIs directly. My Spark code passes S3 text string paths to Spark and Spark uses S3 internally.

My Spark program just does the following in a loop: Load data from S3 -> Process -> Write data to different location on S3.

My first suspicion is that some internal Amazon or Spark code is not properly disposing of connections and the connection pool becomes exhausted.

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.AmazonClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:618)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991)
            at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212)
            at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at com.sun.proxy.$Proxy44.retrieveMetadata(Unknown Source)
            at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780)
            at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428)
            at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313)
            at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:85)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
            at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
            at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
            at py4j.Gateway.invoke(Gateway.java:280)
            at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
            at py4j.commands.CallCommand.execute(CallCommand.java:79)
            at py4j.GatewayConnection.run(GatewayConnection.java:211)
            at java.lang.Thread.run(Thread.java:745)
    Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
            at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy45.getConnection(Unknown Source)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
            at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
            at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
            ... 41 more
like image 507
clay Avatar asked Aug 27 '16 21:08

clay


People also ask

What is the data processing engine behind Amazon EMR?

Amazon EMR (previously called Amazon Elastic MapReduce) is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark , on AWS to process and analyze vast amounts of data.

Can we run PySpark on EMR?

You can use AWS Step Functions to run PySpark applications as EMR Steps on an existing EMR cluster. Using Step Functions, we can also create the cluster, run multiple EMR Steps sequentially or in parallel, and finally, auto-terminate the cluster.


1 Answers

I encountered this issue with a very trivial program on EMR (read data from S3, filter, write to S3).

I could solve it by using the S3A file system implementation and setting fs.s3a.connection.maximum to 100 to have a bigger connection pool. (default is 15; see Hadoop-AWS module: Integration with Amazon Web Services for more config properties)

This is how I set the configuration:

// in Scala
val hc = sc.hadoopConfiguration

// in Python (not tested)
hc = sc._jsc.hadoopConfiguration()

// setting the config is the same for both languages
hc.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hc.setInt("fs.s3a.connection.maximum", 100)

To make it work, the S3 URIs passed to Spark have to start with s3a://...

like image 120
hiddenbit Avatar answered Sep 28 '22 09:09

hiddenbit