Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to create schema information dynamically with pyspark and not escape characters in output jsonfile?

Tags:

python

pyspark

Currently pyspark formats logFile, then loads redshift.

Analyze each item about logFile outputted in json format, add an item, and load it into Redshift. However, the format of some items is different for each type. (For the same item, Shcema is applied beforehand.) Escape characters will be entered even if outputting as it is。。 Is there a way to dynamically create schema information and output jsonfile has no escape character?

-- Environment --

- spark 2.4.0
- python version 2.7.15

-- DataFrame --

>> df.printSchema()
root
 |-- Name: string (nullable = false)
 |-- d: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

>> df.show(2,False)
+------+------------------------------------------------------------+
|Name  |d                                                           |
+------+------------------------------------------------------------+
|Amber |[Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1]|
|Alfred|[Body -> {"Weight": 80, "Height": 176}, BodyType -> 2]      |
+------+------------------------------------------------------------+

-- Schema(For common item) --

>> print(json.dumps(schema.jsonValue(), indent=2))
{
  "fields": [
    {
      "metadata": {}, 
      "type": "string", 
      "name": "Name", 
      "nullable": false
    }, 
    {
      "metadata": {}, 
      "type": {
        "keyType": "string", 
        "type": "map", 
        "valueType": "string", 
        "valueContainsNull": true
      }, 
      "name": "d", 
      "nullable": false
    }
  ], 
  "type": "struct"
}

-- Code --

from pyspark.sql.types import *

rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
    ,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)

-- Output json file --

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}

-- Output json file(ideal) --

{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}

I attempted to use schema_of_json() and from_json() of pyspark.sql.functions, but it did not work. (schema_of_json can only accept character literals)

-- Trial results --

from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
    jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"
like image 890
田村尚也 Avatar asked Oct 27 '22 22:10

田村尚也


1 Answers

tl;dr

The short answer is no, there is no way to dynamically infer the schema on each row and end up with a column where different rows have different schemas.

However, there is a way to output the json string that you want, and reconcile different json into a common, richly-typed schema

Details

If it were allowed it would be excruciatingly slow, but more importantly it is not allowed because it breaks the relational model that allows SparkSQL to operate consistently.

A dataframe consists of columns (fields) and a column has only one datatype; the datatype represents the entire column. It's not strictly enforceable in Pyspark given Python's nature but it is important at run-time so that statement still applies.

In your example if you wanted to project the City attribute with something like d.Body.City then that has to exist for both Alfred and Amber. At least, the metadata for that field has to exist even if there is no value. The execution engine needs to know quickly whether the path is invalid or not, to avoid pointless scanning of every row.

Some ways to reconcile multiple types in a single column are ( I'm sure there's more I can't think of):

  1. use a variant/union/option type (e.g. union all the common and uncommon json schemas together)
  2. serialize it to something, like a json string (this is where you start before applying that jsonschema, great for transferring data, not great for analysis)
  3. upcast it to a super-type, boxed or generic object (like in RDD) that has lowest-common-denominator behaviour/interface/property (loses metadata and the attributes of the sub types)
  4. Don't store it as a single type, e.g. store the separate variations into different columns and use a different json schema on each

I like (1) in this case, but (4) could be valid as an interim step to finding a universal schema.

Your example "common" json schema is more like option (3). Inside the map you've called "d" (I guess because it's a dict?) the information about the fields is not available without scanning the data.

root
 |-- Name: string (nullable = false)
 |-- d: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

I realise this is just an interim step towards adding a new column containing the Body, but to do that you have to enumerate all the possible keys in that map out into a more useful schema.

Solution

The universal (common) schema is not a generic map of string -> string, I think it is more useful like this below. It is close to what you tried originally but not dynamic and is valid for both rows. Notice the nullable is the default True for all the attributes

schema_body = StructType([
    StructField("City", StringType()),
    StructField("Country", StringType()),
    StructField("Weight", IntegerType()),
    StructField("Height", IntegerType())
])

df = df.withColumn("Body", from_json("d.Body", schema_body))
df.printSchema()

root
 |-- Name: string (nullable = false)
 |-- d: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- Body: struct (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Weight: integer (nullable = true)
 |    |-- Height: integer (nullable = true)


df.show(2, False)

+------+---------------------------------------------------------------+---------------------+
|Name  |d                                                              |Body                 |
+------+---------------------------------------------------------------+---------------------+
|Amber |Map(Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1)|[Oregon,US,null,null]|
|Alfred|Map(Body -> {"Weight": 80, "Height": 176}, BodyType -> 2)      |[null,null,80,176]   |
+------+---------------------------------------------------------------+---------------------+

Now you can get to the Body.City easily by selecting d.Body.City without worrying which rows have City.

For your next step you could get it back to a json string

df = df.withColumn("Body", to_json("d.Body"))

You could also combine it with the previous step

df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
 |-- Name: string (nullable = false)
 |-- BodyAttributes: struct (nullable = true)
 |    |-- Body: string (nullable = true)
 |    |-- BodyType: integer (nullable = true)
 |-- Body: string (nullable = true)

df.show(2, False)

+------+---------------------------------------+--------------------------------+
|Name  |BodyAttributes                         |Body                            |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2]      |{"Weight":80,"Height":176}      |
+------+---------------------------------------+--------------------------------+

Notice when converting it back to a json string those NULL values are gone. It's also now a jsonstring so easy to write to file as you wanted.


Going further

If you were doing this as part of a process to make the data accessible for analysis, reporting or other purposes I would do something like this

schema = StructType([
    StructField('Name',StringType(), False),
    StructField(
        'd',
        StructType([
            StructField("Body", StringType()),
            StructField("BodyType", IntegerType())
        ])
    )
])

df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
    "Body", 
    from_json("d.Body", schema_body)
).withColumn(
    "BodyType", 
    col("d.BodyType")
).drop("d")

df.printSchema()

root
 |-- Name: string (nullable = false)
 |-- Body: struct (nullable = true)
 |    |-- City: string (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Weight: integer (nullable = true)
 |    |-- Height: integer (nullable = true)
 |-- BodyType: integer (nullable = true)


df.show(2, False)

+------+---------------------+--------+
|Name  |Body                 |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1       |
|Alfred|[null,null,80,176]   |2       |
+------+---------------------+--------+

Then you can select Body.City, Body.Country, Body.Weight,Body.Height`

You could go one more step, but that would really depend on how many of these possible Body keys there are and how sparse it is.

df = df.withColumn(
    "City", col("Body.City")
).withColumn(
    "Country", col("Body.Country")
).withColumn(
    "Weight", col("Body.Weight")
).withColumn(
    "Height", col("Body.Height")
).drop("Body")

df.printSchema()

root
 |-- Name: string (nullable = false)
 |-- BodyType: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Height: integer (nullable = true)

df.show(2, False)

+------+--------+------+-------+------+------+
|Name  |BodyType|City  |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1       |Oregon|US     |null  |null  |
|Alfred|2       |null  |null   |80    |176   |
+------+--------+------+-------+------+------+
like image 179
Davos Avatar answered Nov 15 '22 06:11

Davos