I have a flat parquet file where one varchar columns store JSON data as a string and I want to transform this data to a nested structure, i.e. the JSON data becomes nested parquet. I know the schema of the JSON in advance if this is of any help.
Here is what I have "accomplished" so far:
Building the sample data
# load packages
import pandas as pd
import json
import pyarrow as pa
import pyarrow.parquet as pq
# Create dummy data
# dummy data with JSON as string
person_data = {'Name': ['Bob'],
'Age': [25],
'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"
}
# from dict to panda df
person_df = pd.DataFrame.from_dict(person_data)
# from panda df to pyarrow table
person_pat = pa.Table.from_pandas(person_df)
# save as parquet file
pq.write_table(person_pat, 'output/example.parquet')
Script proposal
# load dummy data
sample = pa.parquet.read_table('output/example.parquet')
# transform to dict
sample_dict = sample.to_pydict()
# print with indent for checking
print(json.dumps(sample_dict, sort_keys=True, indent=4))
# load json from string and replace string
sample_dict['languages'] = json.loads(str(sample_dict['languages']))
print(json.dumps(sample_dict, sort_keys=True, indent=4))
#type(sample_dict['languages'])
# how to keep the nested structure when going from dict —> panda df —> pyarrow table?
# save dict as nested parquet...
So, I here are my specific questions:
PySpark can do it in a simple way as I show below. The main benefit of using PySpark is the scalability of the infrastructure as data grows, but using plain Python that can be problematic as if you don't use a framework like Dask, you will need bigger machines to run it.
from pyspark.sql import HiveContext
hc = HiveContext(sc)
# This is a way to create a PySpark dataframe from your sample, but there are others
nested_df = hc.read.json(sc.parallelize(["""
{'Name': ['Bob'],
'Age': [25],
'languages': "{'mother_language': 'English', 'other_languages': ['German', 'French']}"
}
"""]))
# You have nested Spark dataframe here. This shows the content of the spark dataframe. 20 is the max number of rows to show on the console and False means don't cut the columns that don't fit on the screen (show all columns content)
nested_df.show(20,False)
# Writes to a location as parquet
nested_df.write.parquet('/path/parquet')
# Reads the file from the previous location
spark.read.parquet('/path/parquet').show(20, False)
The output of this code is
+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+
+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|{'mother_language': 'English', 'other_languages': ['German', 'French']}|
+----+-----+-----------------------------------------------------------------------+
To answer your questions
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With