I have a difficult issue regarding rows in a PySpark DataFrame which contains a series of json strings.
The issue revolves around that each row might contain a different schema from another, so when I want to transform said rows into a subscriptable datatype in PySpark, I need to have a "unified" schema.
For example, consider this dataframe
import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))
Notice that each row contains different versions of the json string. To combat this, I do the following transforms
import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame
def convert_to_row(d: dict) -> Row:
"""Convert a dictionary to a SparkRow.
Parameters
----------
d : dict
Dictionary to convert.
Returns
-------
Row
"""
return Row(**OrderedDict(sorted(d.items())))
def get_schema_from_dictionary(the_dict: dict):
"""Create a schema from a dictionary.
Parameters
----------
the_dict : dict
Returns
-------
schema
Schema understood by PySpark.
"""
return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema
def get_universal_schema(df: SparkDataFrame, column: str):
"""Given a dataframe, retrieve the "global" schema for the column.
NOTE: It does this by merging across all the rows, so this will
take a long time for larger dataframes.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
mega_dict = {}
for value in col_values:
mega_dict = {**mega_dict, **value}
return get_schema_from_dictionary(mega_dict)
def get_sample_schema(df, column):
"""Given a dataframe, sample a single value to convert.
NOTE: This assumes that the dataframe has the same schema
over all rows.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
Returns
-------
schema
Schema understood by PySpark.
"""
return get_universal_schema(df.limit(1), column)
def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
"""Convert json-string column to a subscriptable object.
Parameters
----------
df : SparkDataFrame
Dataframe containing the column
column : str
Column to parse.
manual_schema : PysparkSchema, optional
Schema understood by PySpark, by default None
merge : bool, optional
Parse the whole dataframe to extract a global schema, by default False
Returns
-------
SparkDataFrame
"""
if manual_schema is None or manual_schema == {}:
if merge:
schema = get_universal_schema(df, column)
else:
schema = get_sample_schema(df, column)
else:
schema = manual_schema
return df.withColumn(column, fcn.from_json(column, schema))
Then, I can simply do the following, to get a new dataframe, which has a unified schema
df = from_json(df, column='B', merge=True)
df.printSchema()
root
|-- A: long (nullable = true)
|-- B: struct (nullable = true)
| |-- a: long (nullable = true)
| |-- b: string (nullable = true)
| |-- c: long (nullable = true)
| |-- d: double (nullable = true)
| |-- f: struct (nullable = true)
| | |-- maybe_this: long (nullable = true)
| | |-- some_other: struct (nullable = true)
| | | |-- A: long (nullable = true)
Now we come to the crux of the issue. Since I'm doing this here col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
I'm limited to the amount of memory on the master node.
How can I do a similar procedure, s.t the work is more distributed to each worker instead, before I collect to the master node?
If I understand your question correctly, since we can use RDD as the path
parameter of the spark.read.json() method and RDD is distributed and could reduce the potential OOM issue using collect()
method on a large dataset, thus you can try adjust the function get_universal_schema
to the following:
def get_universal_schema(df: SparkDataFrame, column: str):
return spark.read.json(df.select(column).rdd.map(lambda x: x[0])).schema
and keep two functions: get_sample_schema()
and from_json()
as-is.
Spark DataFrame
s are designed to work with the data that has schema. DataFrame
API exposes the methods that are useful on a data with a defined schema, like groupBy
a column, or aggregation functions to operate on columns, etc. etc.
Given the requirements presented in the question, it appears to me that there is no fixed schema in the input data, and that you won't benefit from a DataFrame
API. In fact it will likely add more constraints instead.
I think it is better to consider this data "schemaless" and use a lower-level API - the RDD
s. RDDs are distributed across the cluster by definition. So, using RDD API you can first pre-process the data (consuming it as text), and then convert it to a DataFrame
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With