Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

S3 SlowDown error in Spark on EMR

I am getting this error when writing a parquet file, this has started to happen recently

com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 2CA496E2AB87DC16), S3 Extended Request ID: 1dBrcqVGJU9VgoT79NAVGyN0fsbj9+6bipC7op97ZmP+zSFIuH72lN03ZtYabNIA2KaSj18a8ho=
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1777)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:22)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:7)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
    at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:125)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:355)
    at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy28.deleteAll(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1331)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:296)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:463)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortJob(FileOutputCommitter.java:482)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:134)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:146)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:494)
    at com.radius.network_data.ingestion.processors.NetworkDataPathProcessor.process(NetworkDataPathProcessor.scala:38)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline.com$radius$network_data$ingestion$NetworkDataIngestionPipeline$$processClient(NetworkDataIngestionPipeline.scala:51)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1$$anonfun$apply$1.apply(NetworkDataIngestionPipeline.scala:42)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1$$anonfun$apply$1.apply(NetworkDataIngestionPipeline.scala:41)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1.apply(NetworkDataIngestionPipeline.scala:41)
    at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1.apply(NetworkDataIngestionPipeline.scala:39)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
    at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

any idea about what could be the problem? I am using these write options

result.write
      .mode(SaveMode.Overwrite)
      .partitionBy("entityType")
      .parquet(joinedPath)
like image 262
Mikel San Vicente Avatar asked Sep 07 '17 18:09

Mikel San Vicente


People also ask

Does EMR work with S3?

You can't configure Amazon EMR to use Amazon S3 instead of HDFS for the Hadoop storage layer. HDFS and the EMR File System (EMRFS), which uses Amazon S3, are both compatible with Amazon EMR, but they're not interchangeable.

Does Spark support Amazon S3?

With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR.

What is a 500 error on Amazon?

The error code 500 Internal Error indicates that Amazon S3 can't handle the request at that time. The error code 503 Slow Down typically indicates that the number of requests to your S3 bucket is very high, exceeding the request rate.


1 Answers

Apparently the problem was caused by writing too many very small part files, it was fixed by reduce the number of output partitions

like image 111
Mikel San Vicente Avatar answered Sep 22 '22 15:09

Mikel San Vicente