Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: How to Read Many JSON Files, Multiple Records Per File

I have a large dataset stored in a S3 bucket, but instead of being a single large file, it's composed of many (113K to be exact) individual JSON files, each of which contains 100-1000 observations. These observations aren't on the highest level, but require some navigation within each JSON to access. i.e. json["interactions"] is a list of dictionaries.

I'm trying to utilize Spark/PySpark (version 1.1.1) to parse through and reduce this data, but I can't figure out the right way to load it into an RDD, because it's neither all records > one file (in which case I'd use sc.textFile, though added complication here of JSON) nor each record > one file (in which case I'd use sc.wholeTextFiles).

Is my best option to use sc.wholeTextFiles and then use a map (or in this case flatMap?) to pull the multiple observations from being stored under a single filename key to their own key? Or is there an easier way to do this that I'm missing?

I've seen answers here that suggest just using json.loads() on all files loaded via sc.textFile, but it doesn't seem like that would work for me because the JSONs aren't simple highest-level lists.

like image 906
cmwild Avatar asked Nov 30 '22 18:11

cmwild


1 Answers

The previous answers are not going to read the files in a distributed fashion (see reference). To do so, you would need to parallelize the s3 keys and then read in the files during a flatMap step like below.

import boto3
import json
from pyspark.sql import Row

def distributedJsonRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucketName', key=s3Key)
    contents = json.loads(s3obj.get()['Body'].read().decode('utf-8'))
    for dicts in content['interactions']
        yield Row(**dicts)

pkeys = sc.parallelize(keyList) #keyList is a list of s3 keys
dataRdd = pkeys.flatMap(distributedJsonRead)

Boto3 Reference

like image 69
David Avatar answered Jan 03 '23 05:01

David