For checkout purpose I try to set up an Amazon S3 bucket as checkpoint file.
val checkpointDir = "s3a://bucket-name/checkpoint.txt"
val sc = new SparkContext(conf)
sc.setLocalProperty("spark.default.parallelism", "30")
sc.hadoopConfiguration.set("fs.s3a.access.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "bucket-name.s3-website.eu-central-1.amazonaws.com")
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint(checkpointDir)
but it stops with this exception
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 9D8E8002H3BBDDC7, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Qme5E3KAr/KX0djiq9poGXPJkmr0vuXAduZujwGlvaAl+oc6vlUpq7LIh70IF3LNgoewjP+HnXA=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:232)
at com.misterbell.shiva.StreamingApp$.main(StreamingApp.scala:89)
at com.misterbell.shiva.StreamingApp.main(StreamingApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I don't understand why I got this error and I can't find any example.
s3 is a block-based overlay on top of Amazon S3,whereas s3n/s3a are not. These are are object-based. s3n supports objects up to 5GB when size is the concern, while s3a supports objects up to 5TB and has higher performance. Note that s3a is the successor to s3n. I hope this helps.
Introducing the Hadoop S3A client. Hadoop's “S3A” client offers high-performance IO against Amazon S3 object store and compatible implementations. Directly reads and writes S3 objects. Compatible with standard S3 clients. Compatible with files created by the older s3n:// client and Amazon EMR's s3:// client.
When Spark is running in a cloud infrastructure, the credentials are usually automatically set up. spark-submit reads the AWS_ACCESS_KEY_ID , AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN environment variables and sets the associated authentication options for the s3n and s3a connectors to Amazon S3.
With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
This message correspond to something like "bad endpoint" or bad signature version support.
like seen here frankfurt is the only one that not support signature version 2. And it's the one I picked.
Of course after all my reserch can't say what is signature version, it's not obvious in the documentation. But the V2 seems to work with s3a.
The endpoint seen in the S3 interface is not the real endpoint it's just the web endpoint.
you have to use one of theses endpoint like that
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-west-1.amazonaws.com")
But it's work by default with US endpoint
If you'd like to anyway use the region that supports Signature V4 in spark you can pass flag -Dcom.amazonaws.services.s3.enableV4
to the driver options and executor options on runtime. For example:
spark-submit --conf spark.driver.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
--conf spark.executor.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
... (other spark options)
With this settings Spark is able to write to Frankfurt (and other V4-only regions) even with not-so-fresh AWS sdk version (com.amazonaws:aws-java-sdk:1.7.4
in my case)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With