Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get a list of files in S3 using PySpark in Databricks

I'm trying to generate a list of all S3 files in a bucket/folder. There are usually in the magnitude of millions of files in the folder. I use boto right now and it's able to retrieve around 33k files per minute, which for even a million files, takes half an hour. I also load these files into a dataframe, but generate and use this list as a way to track which files are being processed.

What I've noticed is that when I ask Spark to read all files in the folder, it does a listing of its own and is able to list them out much faster than the boto call can, and then process those files. I looked up a way to do this in PySpark, but found no good examples. The closest I got was some Java and Scala code to list out the files using the HDFS library.

Is there a way we can do this in Python and Spark? For reference, I'm trying to replicate the following code snippet:

def get_s3_files(source_directory, file_type="json"):
    s3_resource = boto3.resource("s3")

    file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
    bucket_name = str(source_directory.parts[3])
    prefix = "/".join(source_directory.parts[4:])

    bucket = s3_resource.Bucket(bucket_name)

    s3_source_files = []

    for object in bucket.objects.filter(Prefix=prefix):
        if object.key.endswith(f".{file_type}"):
            s3_source_files.append(
                (
                    f"{file_prepend_path}/{object.key}",
                    object.size,
                    str(source_directory),
                    str(datetime.now()),
                )
            )

    return s3_source_files
like image 711
CodingInCircles Avatar asked Oct 13 '25 10:10

CodingInCircles


2 Answers

This can be achievable very simply by dbutils.

def get_dir_content(ls_path):
  dir_paths = dbutils.fs.ls(ls_path)
  subdir_paths = [get_dir_content(p.path) for p in dir_paths if p.isDir() and p.path != ls_path]
  flat_subdir_paths = [p for subdir in subdir_paths for p in subdir]
  return list(map(lambda p: p.path, dir_paths)) + flat_subdir_paths
    

paths = get_dir_content('s3 location')
[print(p) for p in paths]
like image 176
Karthikeyan Rasipalay Durairaj Avatar answered Oct 14 '25 22:10

Karthikeyan Rasipalay Durairaj


For some reason, using the AWS CLI command was roughly 15 times(!) faster than using boto. Not sure exactly why this is the case, but here's the code I am currently using, in case someone might find it handy. Basically, use s3api to list the objects, and then use jq to manipulate the output and get it into a form of my liking.

def get_s3_files(source_directory, schema, file_type="json"):

    file_prepend_path = f"/{'/'.join(source_directory.parts[1:4])}"
    bucket = str(source_directory.parts[3])
    prefix = "/".join(source_directory.parts[4:])

    s3_list_cmd = f"aws s3api list-objects-v2 --bucket {bucket} --prefix {prefix} | jq -r '.Contents[] | select(.Key | endswith(\".{file_type}\")) | [\"{file_prepend_path}/\"+.Key, .Size, \"{source_directory}\", (now | strftime(\"%Y-%m-%d %H:%M:%S.%s\"))] | @csv'"

    s3_list = subprocess.check_output(s3_list_cmd, shell=True, universal_newlines=True)

    with open(f"s3_file_paths.csv", "w") as f:
        f.truncate()
        f.write(s3_list)

    s3_source_files_df = spark.read.option("header", False).schema(schema).csv(f"s3_file_paths.csv")

    return s3_source_files_df
like image 39
CodingInCircles Avatar answered Oct 14 '25 22:10

CodingInCircles