I am trying to read a JSON file, from Amazon s3, to create a spark context and use it to process the data.
Spark is basically in a docker container. So putting files in docker path is also PITA. Hence pushed it to S3.
The code below explains rest of the stuff.
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("first")
sc = SparkContext(conf=conf)
config_dict = {"fs.s3n.awsAccessKeyId":"**",
"fs.s3n.awsSecretAccessKey":"**"}
bucket = "nonamecpp"
prefix = "dataset.json"
filename = "s3n://{}/{}".format(bucket, prefix)
rdd = sc.hadoopFile(filename,
'org.apache.hadoop.mapred.TextInputFormat',
'org.apache.hadoop.io.Text',
'org.apache.hadoop.io.LongWritable',
conf=config_dict)
I get the following error -
Py4JJavaError Traceback (most recent call last)
<ipython-input-2-b94543fb0e8e> in <module>()
9 'org.apache.hadoop.io.Text',
10 'org.apache.hadoop.io.LongWritable',
---> 11 conf=config_dict)
12
/usr/local/spark/python/pyspark/context.pyc in hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
558 jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
559 valueClass, keyConverter, valueConverter,
--> 560 jconf, batchSize)
561 return RDD(jrdd, self)
562
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopFile.
: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at org.apache.hadoop.fs.s3native.$Proxy20.initialize(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
at org.apache.spark.rdd.RDD.take(RDD.scala:1060)
at org.apache.spark.rdd.RDD.first(RDD.scala:1093)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
at org.apache.spark.api.python.PythonRDD$.hadoopFile(PythonRDD.scala:543)
at org.apache.spark.api.python.PythonRDD.hadoopFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:744)
I have clearly provided aswSecretAccessKey and awsAccessId. Whats going wrong?
If you are using PySpark to access S3 buckets, you must pass the Spark engine the right packages to use, specifically aws-java-sdk and hadoop-aws . It'll be important to identify the right package version to use. As of this writing aws-java-sdk 's 1.7. 4 version and hadoop-aws 's 2.7.
With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
I've solved adding --packages org.apache.hadoop:hadoop-aws:2.7.1
into spark-submit command.
It will download all hadoop missing packages that will allow you to execute spark jobs with S3.
Then in your job you need to set your AWS credentials like:
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", aws_id)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", aws_key)
Other option about setting your credentials is define them into spark/conf/spark-env:
#!/usr/bin/env bash
AWS_ACCESS_KEY_ID='xxxx'
AWS_SECRET_ACCESS_KEY='xxxx'
SPARK_WORKER_CORES=1 # to set the number of cores to use on this machine
SPARK_WORKER_MEMORY=1g # to set how much total memory workers have to give executors (e.g. 1000m, 2g)
SPARK_EXECUTOR_INSTANCES=10 #, to set the number of worker processes per node
More info:
I would suggest going through this link.
In my case, I used Instance profile credentials to access s3 data.
Instance profile credentials– used on EC2 instances, and delivered through the Amazon EC2 metadata service. The AWS SDK for Java uses the InstanceProfileCredentialsProvider to load these credentials.
Note
Instance profile credentials are used only if AWS_CONTAINER_CREDENTIALS_RELATIVE_URI is not set. See EC2ContainerCredentialsProviderWrapper for more information.
For pyspark, I use setting to access s3 content.
def get_spark_context(app_name):
# configure
conf = pyspark.SparkConf()
# init & return
sc = pyspark.SparkContext.getOrCreate(conf=conf)
# s3a config
sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint',
's3.eu-central-1.amazonaws.com')
sc._jsc.hadoopConfiguration().set(
'fs.s3a.aws.credentials.provider',
'com.amazonaws.auth.InstanceProfileCredentialsProvider,'
'com.amazonaws.auth.profile.ProfileCredentialsProvider'
)
return pyspark.SQLContext(sparkContext=sc)
More on spark context here.
Please refer this for type S3 access.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With