Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to list files in S3 bucket using Spark Session?

Is it possible to list all of the files in given S3 path (ex: s3://my-bucket/my-folder/*.extension) using a SparkSession object?

like image 457
code Avatar asked Jan 06 '19 09:01

code


People also ask

Can Spark access S3 data?

If you are using PySpark to access S3 buckets, you must pass the Spark engine the right packages to use, specifically aws-java-sdk and hadoop-aws . It'll be important to identify the right package version to use.

How do I connect my AWS Spark to my S3?

Configuring the Spark ShellStart the Spark shell with the dataframes spark-csv package. Set the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables, so Spark can communicate with S3. Once the environment variables are set, restart the Spark shell and enter the following commands. The System.


2 Answers

You can use Hadoop API for accessing files on S3 (Spark uses it as well):

import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val path = "s3://somebucket/somefolder"
val fileSystem = FileSystem.get(URI.create(path), new Configuration())
val it = fileSystem.listFiles(new Path(path), true)
while (it.hasNext()) {
  ...
}
like image 147
Michael Spector Avatar answered Nov 07 '22 02:11

Michael Spector


Approach 1

For pyspark users, I've translated Michael Spector's answer (I'll leave it to you to decide if using this is a good idea):

sc = spark.sparkContext
myPath = f's3://my-bucket/my-prefix/'
javaPath = sc._jvm.java.net.URI.create(myPath)
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFileSystem = sc._jvm.org.apache.hadoop.fs.FileSystem.get(javaPath, sc._jvm.org.apache.hadoop.conf.Configuration())
iterator = hadoopFileSystem.listFiles(hadoopPath, True)

s3_keys = []
while iterator.hasNext():
    s3_keys.append(iterator.next().getPath().toUri().getRawPath())    

s3_keys now holds all file keys found at my-bucket/my-prefix

Approach 2 Here is an alternative that I found (hat tip to @forgetso):

myPath = 's3://my-bucket/my-prefix/*'
hadoopPath = sc._jvm.org.apache.hadoop.fs.Path(myPath)
hadoopFs = hadoopPath.getFileSystem(sc._jvm.org.apache.hadoop.conf.Configuration())
statuses = hadoopFs.globStatus(hadoopPath)

for status in statuses:
  status.getPath().toUri().getRawPath()
  # Alternatively, you can get file names only with:
  # status.getPath().getName()

Approach 3 (incomplete!)

The two approaches above do not use the Spark parallelism mechanism that would be applied on a distributed read. That logic looks private though. See parallelListLeafFiles here. I have not found a way to compel pyspark do to a distributed ls on s3 without also reading the file contents. I tried to use py4j to instantiate an InMemoryFileIndex, but can't get the incantation right. Here is what I have so far if someone wants to pick it up from here:

myPath = f's3://my-bucket/my-path/'
paths = sc._gateway.new_array(sc._jvm.org.apache.hadoop.fs.Path, 1)
paths[0] = sc._jvm.org.apache.hadoop.fs.Path(myPath)

emptyHashMap = sc._jvm.java.util.HashMap()
emptyScalaMap = sc._jvm.scala.collection.JavaConversions.mapAsScalaMap(emptyMap)

# Py4J is not happy with this:
sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
    spark._jsparkSession, 
    paths, 
    emptyScalaMap, 
    sc._jvm.scala.Option.empty() # Optional None
)
like image 37
Lou Zell Avatar answered Nov 07 '22 01:11

Lou Zell