I would like to read Parquet data stored on S3 from PySpark.
I've downloaded spark from here:
http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
And installed it to Python naively
cd python
python setup.py install
This seems to function fine and I can import pyspark, make a SparkContext, etc.. However when I go to read some publicly accessible parquet data I get the following:
import pyspark
sc = pyspark.SparkContext('local[4]')
sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3://bucket-name/mydata.parquet')
And I receive the following exception
Py4JJavaError: An error occurred while calling o55.parquet.
: java.io.IOException: No FileSystem for scheme: s3
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
This error pops up a bit from google searches. So far none of the solutions provided have been helpful.
I'm on Linux (Ubuntu 16.04) on a personal computer without much else installed (everything is pretty stock).
I downgraded to http://www.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.4.tgz to have AWS included by default.
Now unfortunately my AWS credentials aren't being picked up. I've tried a few things:
Including them as SparkConf parameters
conf = (pyspark.SparkConf()
.set('fs.s3.awsAccessKeyId', ...')
.set('fs.s3.awsSecretAccessKey', '...'))
sc = pyspark.SparkContext('local[4]', conf=conf)
Unfortunately in all cases I receive a traceback like the following
IllegalArgumentException: 'AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).'
With Amazon EMR release version 5.17. 0 and later, you can use S3 Select with Spark on Amazon EMR. S3 Select allows applications to retrieve only a subset of data from an object.
Access S3 buckets with URIs and AWS keysThis method allows Spark workers to access an object in an S3 bucket directly using AWS keys. It uses Databricks secrets to store the keys.
Configuring the Spark ShellStart the Spark shell with the dataframes spark-csv package. Set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, so Spark can communicate with S3. Once the environment variables are set, restart the Spark shell and enter the following commands. The System.
spark. read. text() method is used to read a text file from S3 into DataFrame. like in RDD, we can also use this method to read multiple files at a time, reading patterns matching files and finally reading all files from a directory.
Using the Hadoop-2.4 build of the pre-built spark 2.X binary (which I believe ships with s3 functionality) you can programmatically configure spark to pull s3 data in the following manner:
import pyspark
conf = pyspark.SparkConf()
sc = pyspark.SparkContext('local[4]', conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "")
sql = pyspark.SQLContext(sc)
df = sql.read.parquet('s3n://bucket-name/mydata.parquet')
A critical thing to note is the prefix s3n in both the URI for the bucket and the configuration name
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With