Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read input from S3 in a Spark Streaming EC2 cluster application

I'm trying to make my Spark Streaming application reading his input from a S3 directory but I keep getting this exception after launching it with spark-submit script:

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).     at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:66)     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:49)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:606)     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)     at org.apache.hadoop.fs.s3native.$Proxy6.initialize(Unknown Source)     at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:216)     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)     at org.apache.spark.streaming.StreamingContext.checkpoint(StreamingContext.scala:195)     at MainClass$.main(MainClass.scala:1190)     at MainClass.main(MainClass.scala)     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.lang.reflect.Method.invoke(Method.java:606)     at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

I'm setting those variables through this block of code as suggested here http://spark.apache.org/docs/latest/ec2-scripts.html (bottom of the page):

val ssc = new org.apache.spark.streaming.StreamingContext(   conf,   Seconds(60)) ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId",args(2)) ssc.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey",args(3)) 

args(2) and args(3) are my AWS Access Key ID and Secrete Access Key of course.

Why it keeps saying they are not set?

EDIT: I tried also this way but I get the same exception:

val lines = ssc.textFileStream("s3n://"+ args(2) +":"+ args(3) + "@<mybucket>/path/") 
like image 445
gprivitera Avatar asked Jun 04 '14 22:06

gprivitera


People also ask

Can Spark streaming read from S3?

1. Spark read a text file from S3 into RDD. We can read a single text file, multiple files and all files from a directory located on S3 bucket into Spark RDD by using below two functions that are provided in SparkContext class.

How do I get S3 to read files to Spark?

Accessing S3 Bucket through Spark Well, it is not very easy to read S3 bucket by just adding Spark-core dependencies to your Spark project and use spark. read to read you data from S3 Bucket. Add Aws-Java-SDK along with Hadoop-AWS package to your spark-shell as written in the below command.

How do I connect my AWS Spark to my S3?

Configuring the Spark ShellStart the Spark shell with the dataframes spark-csv package. Set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, so Spark can communicate with S3. Once the environment variables are set, restart the Spark shell and enter the following commands. The System.


1 Answers

Odd. Try also doing a .set on the sparkContext. Try also exporting env variables before you start the application:

export AWS_ACCESS_KEY_ID=<your access> export AWS_SECRET_ACCESS_KEY=<your secret> 

^^this is how we do it.

UPDATE: According to @tribbloid the above broke in 1.3.0, now you have to faff around for ages and ages with hdfs-site.xml, or your can do (and this works in a spark-shell):

val hadoopConf = sc.hadoopConfiguration; hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") hadoopConf.set("fs.s3.awsAccessKeyId", myAccessKey) hadoopConf.set("fs.s3.awsSecretAccessKey", mySecretKey) 
like image 98
samthebest Avatar answered Oct 11 '22 07:10

samthebest