Is it possible to list all of the files in given S3 path (ex: s3://my-bucket/my-folder/*.extension) using a SparkSession object?
If you are using PySpark to access S3 buckets, you must pass the Spark engine the right packages to use, specifically aws-java-sdk and hadoop-aws . It'll be important to identify the right package version to use.
Configuring the Spark ShellStart the Spark shell with the dataframes spark-csv package. Set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, so Spark can communicate with S3. Once the environment variables are set, restart the Spark shell and enter the following commands. The System.
You can use Hadoop API for accessing files on S3 (Spark uses it as well):
import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
val path = "s3://somebucket/somefolder"
val fileSystem = FileSystem.get(URI.create(path), new Configuration())
val it = fileSystem.listFiles(new Path(path), true)
while (it.hasNext()) {
...
}
Approach 1
For pyspark users, I've translated Michael Spector's answer (I'll leave it to you to decide if using this is a good idea):
sc = spark.sparkContext
myPath = f's3://my-bucket/my-prefix/'
javaPath = sc._jvm.java.net.URI.create(myPath)
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem.get(javaPath, sc._jvm.org.apache.hadoop.conf.Configuration())
iterator = hadoopFileSystem.listFiles(hadoopPath, True)
s3_keys = []
while iterator.hasNext():
s3_keys.append(iterator.next().getPath().toUri().getRawPath())
s3_keys
now holds all file keys found at my-bucket/my-prefix
Approach 2 Here is an alternative that I found (hat tip to @forgetso):
myPath = 's3://my-bucket/my-prefix/*'
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(sc._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)
for status in statuses:
status.getPath().toUri().getRawPath()
# Alternatively, you can get file names only with:
# status.getPath().getName()
Approach 3 (incomplete!)
The two approaches above do not use the Spark parallelism mechanism that would be applied on a distributed read. That logic looks private though. See parallelListLeafFiles
here. I have not found a way to compel pyspark do to a distributed ls
on s3 without also reading the file contents. I tried to use py4j to instantiate an InMemoryFileIndex
, but can't get the incantation right. Here is what I have so far if someone wants to pick it up from here:
myPath = f's3://my-bucket/my-path/'
paths = sc._gateway.new_array(sc._jvm.org.apache.hadoop.fs.Path, 1)
paths[0] = sc._jvm.org.apache.hadoop.fs.Path(myPath)
emptyHashMap = sc._jvm.java.util.HashMap()
emptyScalaMap = sc._jvm.scala.collection.JavaConversions.mapAsScalaMap(emptyMap)
# Py4J is not happy with this:
sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
spark._jsparkSession,
paths,
emptyScalaMap,
sc._jvm.scala.Option.empty() # Optional None
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With