I have a public available Amazon s3 resource (text file) and want to access it from spark. That means - I don't have any Amazon credentials - it works fine if I want to just download it:
val bucket = "<my-bucket>"
val key = "<my-key>"
val client = new AmazonS3Client
val o = client.getObject(bucket, key)
val content = o.getObjectContent // <= can be read and used as input stream
However, when I try to access the same resource from spark context
val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)
val f = sc.textFile(s"s3a://$bucket/$key")
println(f.count())
I receive the following error with stacktrace:
Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
at com.example.Main$.main(Main.scala:14)
at com.example.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
I don't want to provide any AWS credentials - I just want to access resource anonymously (for now) - how to achieve this? I probably need to make it use something like AnonymousAWSCredentialsProvider - but how to put it inside spark or hadoop?
P.S. My build.sbt just in case
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.4.1",
"org.apache.hadoop" % "hadoop-aws" % "2.7.1"
)
UPDATED: After I did some investigations - I see the reason why itsn't working.
First of all, S3AFileSystem creates AWS client with the following order of credentials:
AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
new BasicAWSCredentialsProvider(accessKey, secretKey),
new InstanceProfileCredentialsProvider(),
new AnonymousAWSCredentialsProvider()
);
"accessKey" and "secretKey" values are taken from the spark conf instance (keys must be "fs.s3a.access.key" and "fs.s3a.secret.key" or org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY and org.apache.hadoop.fs.s3a.Constants.SECRET_KEY constants, which is more convenient).
Second - you probably see that AnonymousAWSCredentialsProvider is the third option (last priority) - what could possible be wrong with that? See the implementation of AnonymousAWSCredentials:
public class AnonymousAWSCredentials implements AWSCredentials {
public String getAWSAccessKeyId() {
return null;
}
public String getAWSSecretKey() {
return null;
}
}
It simply returns null for both access key and secret key. Sounds reasonable. But look inside AWSCredentialsProviderChain:
AWSCredentials credentials = provider.getCredentials();
if (credentials.getAWSAccessKeyId() != null &&
credentials.getAWSSecretKey() != null) {
log.debug("Loading credentials from " + provider.toString());
lastUsedProvider = provider;
return credentials;
}
It doesn't choose provider in case both keys are null - that means anonymous credentials can't work. Looks like a bug inside aws-java-sdk-1.7.4. I tried to use latest version - but it's incompatible with hadoop-aws-2.7.1.
Any other ideas?
I personally never accessed public data from Spark. You can try to use dummy credentials, or to create ones just for this usage. Set them directly on the SparkConf object.
val sparkConf: SparkConf = ???
val accessKeyId: String = ???
val secretAccessKey: String = ???
sparkConf.set("spark.hadoop.fs.s3.awsAccessKeyId", accessKeyId)
sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", accessKeyId)
sparkConf.set("spark.hadoop.fs.s3.awsSecretAccessKey", secretAccessKey)
sparkConf.set("spark.hadoop.fs.s3n.awsSecretAccessKey", secretAccessKey)
As an alternative, read the documentation of DefaultAWSCredentialsProviderChain
to see where the credentials are looked for. The list (order is important) is:
- Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
- Java System Properties - aws.accessKeyId and aws.secretKey
- Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI
- Instance profile credentials delivered through the Amazon EC2 metadata service
This is what helped me:
val session = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
.getOrCreate()
val df = session.read.csv(filesFromS3:_*)
Versions:
"org.apache.spark" %% "spark-sql" % "2.4.0",
"org.apache.hadoop" % "hadoop-aws" % "2.8.5",
Documentation: https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Authentication_properties
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With