I'm trying to make my Spark Streaming application reading his input from a S3 directory but I keep getting this exception after launching it with spark-submit script:
Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66) at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source) at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187) at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195) at MainClass$.main(MainClass.scala:1190) at MainClass.main(MainClass.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I'm setting those variables through this block of code as suggested here http://spark.apache.org/docs/latest/ec2-scripts.html (bottom of the page):
val ssc = new org.apache.spark.streaming.StreamingContext( conf, Seconds(60)) ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2)) ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3))
args(2) and args(3) are my AWS Access Key ID and Secrete Access Key of course.
Why it keeps saying they are not set?
EDIT: I tried also this way but I get the same exception:
val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/")
1. Spark read a text file from S3 into RDD. We can read a single text file, multiple files and all files from a directory located on S3 bucket into Spark RDD by using below two functions that are provided in SparkContext class.
Accessing S3 Bucket through Spark Well, it is not very easy to read S3 bucket by just adding Spark-core dependencies to your Spark project and use spark. read to read you data from S3 Bucket. Add Aws-Java-SDK along with Hadoop-AWS package to your spark-shell as written in the below command.
Configuring the Spark ShellStart the Spark shell with the dataframes spark-csv package. Set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, so Spark can communicate with S3. Once the environment variables are set, restart the Spark shell and enter the following commands. The System.
Odd. Try also doing a .set
on the sparkContext
. Try also exporting env variables before you start the application:
export AWS_ACCESS_KEY_ID=<your access> export AWS_SECRET_ACCESS_KEY=<your secret>
^^this is how we do it.
UPDATE: According to @tribbloid the above broke in 1.3.0, now you have to faff around for ages and ages with hdfs-site.xml, or your can do (and this works in a spark-shell):
val hadoopConf = sc.hadoopConfiguration; hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey) hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With