Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unify schema across multiple rows of json strings in Spark Dataframe

Tags:

python

pyspark

I have a difficult issue regarding rows in a PySpark DataFrame which contains a series of json strings.

The issue revolves around that each row might contain a different schema from another, so when I want to transform said rows into a subscriptable datatype in PySpark, I need to have a "unified" schema.

For example, consider this dataframe

import pandas as pd
json_1 = '{"a": 10, "b": 100}'
json_2 = '{"a": 20, "c": 2000}'
json_3 = '{"c": 300, "b": "3000", "d": 100.0, "f": {"some_other": {"A": 10}, "maybe_this": 10}}'
df = spark.createDataFrame(pd.DataFrame({'A': [1, 2, 3], 'B': [json_1, json_2, json_3]}))

Notice that each row contains different versions of the json string. To combat this, I do the following transforms

import json
import pyspark.sql.functions as fcn
from pyspark.sql import Row
from collections import OrderedDict
from pyspark.sql import DataFrame as SparkDataFrame


def convert_to_row(d: dict) -> Row:
    """Convert a dictionary to a SparkRow.

    Parameters
    ----------
    d : dict
        Dictionary to convert.

    Returns
    -------
    Row

    """
    return Row(**OrderedDict(sorted(d.items())))


def get_schema_from_dictionary(the_dict: dict):
    """Create a schema from a dictionary.

    Parameters
    ----------
    the_dict : dict

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return spark.read.json(sc.parallelize([json.dumps(the_dict)])).schema


def get_universal_schema(df: SparkDataFrame, column: str):
    """Given a dataframe, retrieve the "global" schema for the column.

    NOTE: It does this by merging across all the rows, so this will
          take a long time for larger dataframes.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()]
    mega_dict = {}
    for value in col_values:
        mega_dict = {**mega_dict, **value}

    return get_schema_from_dictionary(mega_dict)


def get_sample_schema(df, column):
    """Given a dataframe, sample a single value to convert.

    NOTE: This assumes that the dataframe has the same schema
          over all rows.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.

    Returns
    -------
    schema
        Schema understood by PySpark.

    """
    return get_universal_schema(df.limit(1), column)


def from_json(df: SparkDataFrame, column: str, manual_schema=None, merge: bool = False) -> SparkDataFrame:
    """Convert json-string column to a subscriptable object.

    Parameters
    ----------
    df : SparkDataFrame
        Dataframe containing the column
    column : str
        Column to parse.
    manual_schema : PysparkSchema, optional
        Schema understood by PySpark, by default None
    merge : bool, optional
        Parse the whole dataframe to extract a global schema, by default False

    Returns
    -------
    SparkDataFrame

    """
    if manual_schema is None or manual_schema == {}:
        if merge:
            schema = get_universal_schema(df, column)
        else:
            schema = get_sample_schema(df, column)
    else:
        schema = manual_schema

    return df.withColumn(column, fcn.from_json(column, schema))

Then, I can simply do the following, to get a new dataframe, which has a unified schema

df = from_json(df, column='B', merge=True)
df.printSchema()
root
 |-- A: long (nullable = true)
 |-- B: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- b: string (nullable = true)
 |    |-- c: long (nullable = true)
 |    |-- d: double (nullable = true)
 |    |-- f: struct (nullable = true)
 |    |    |-- maybe_this: long (nullable = true)
 |    |    |-- some_other: struct (nullable = true)
 |    |    |    |-- A: long (nullable = true)

Now we come to the crux of the issue. Since I'm doing this here col_values = [json.loads(getattr(item, column)) for item in df.select(column).collect()] I'm limited to the amount of memory on the master node.

How can I do a similar procedure, s.t the work is more distributed to each worker instead, before I collect to the master node?

like image 379
Adam Fjeldsted Avatar asked May 08 '20 16:05

Adam Fjeldsted


2 Answers

If I understand your question correctly, since we can use RDD as the path parameter of the spark.read.json() method and RDD is distributed and could reduce the potential OOM issue using collect() method on a large dataset, thus you can try adjust the function get_universal_schema to the following:

def get_universal_schema(df: SparkDataFrame, column: str):
    return spark.read.json(df.select(column).rdd.map(lambda x: x[0])).schema

and keep two functions: get_sample_schema() and from_json() as-is.

like image 98
jxc Avatar answered Nov 04 '22 06:11

jxc


Spark DataFrames are designed to work with the data that has schema. DataFrame API exposes the methods that are useful on a data with a defined schema, like groupBy a column, or aggregation functions to operate on columns, etc. etc.

Given the requirements presented in the question, it appears to me that there is no fixed schema in the input data, and that you won't benefit from a DataFrame API. In fact it will likely add more constraints instead.

I think it is better to consider this data "schemaless" and use a lower-level API - the RDDs. RDDs are distributed across the cluster by definition. So, using RDD API you can first pre-process the data (consuming it as text), and then convert it to a DataFrame.

like image 1
Timur Avatar answered Nov 04 '22 07:11

Timur