I have simple spark program running in EMR cluster trying to convert 60 GB of CSV file into parquet. When i submit the job i get below exception.
391, ip-172-31-36-116.us-west-2.compute.internal, executor 96): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow Down; Request ID: D13A3F4D7DD970FA; S3 Extended Request ID: gj3cPalkkOwtaf9XN/P+sb3jX0CNHu/QF9WTabkgP2ISuXcXdbvYO1Irg0O54OCvKlLz8WoR8E4=), S3 Extended Request ID: gj3cPalkkOwtaf9XN/P+sb3jX0CNHu/QF9WTabkgP2ISuXcXdbvYO1Irg0O54OCvKlLz8WoR8E4=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
503 SlowDown errors can also be caused by two or more simultaneous PUT calls on the same object key due to Amazon S3 Strong Consistency. When a PUT request is made in Amazon S3, Amazon S3 automatically stores your object redundantly across multiple Availability Zones.
s3 is a block-based overlay on top of Amazon S3,whereas s3n/s3a are not. These are are object-based. s3n supports objects up to 5GB when size is the concern, while s3a supports objects up to 5TB and has higher performance. Note that s3a is the successor to s3n.
Amazon S3 Transfer Acceleration is a bucket-level feature that enables fast, easy, and secure transfers of files over long distances between your client and an S3 bucket. Transfer Acceleration is designed to optimize transfer speeds from across the world into S3 buckets.
The default job committer for Spark (called FileOutputCommmitter) therefore is not safe to use with S3. For example, if a failure occurs while the renaming operation is in progress, the data output can be corrupted. In addition to being unsafe, it can also be very slow.
To maximize your security, you should not use any of the Authentication properties that require you to write secret keys to a properties file. The simplest way to confirm that your Spark cluster is handling S3 protocols correctly is to point a Spark interactive shell at the cluster and run a simple chain of operators.
Coordinating the versions of the various required libraries is the most difficult part -- writing application code for S3 is very straightforward. You need a working Spark cluster, as described in Managing a Spark Cluster with the spark-ec2 Script.
There are no S3 libraries in the core Apache Spark project. Spark uses libraries from Hadoop to connect to S3, and the integration between Spark, Hadoop, and the AWS services can feel a little finicky. We skip over two older protocols for this recipe:
503 Slow Down
is a generic response from AWS services when you're doing too many requests per second.
Possible solutions:
df.repartition(100)
val spark = SparkSession.builder.appName("Simple Application").master("local[1]").getOrCreate()
I'm surprised that things failed; the Apache s3a client backs off when it sees a problem like this: your work is done, just more slowly.
All of Sergey's advice is good. I'd start by coalescing small files and reducing workers: a smaller cluster can deliver more performance, and save money.
One more: if you are using SSE-KMS to encrypt the data, accessing that key can trigger throttle events too; throttling shared across all applications trying to use the KMS store.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With