Currently pyspark formats logFile, then loads redshift.
Analyze each item about logFile outputted in json format, add an item, and load it into Redshift. However, the format of some items is different for each type. (For the same item, Shcema is applied beforehand.) Escape characters will be entered even if outputting as it is。。 Is there a way to dynamically create schema information and output jsonfile has no escape character?
-- Environment --
- spark 2.4.0
- python version 2.7.15
-- DataFrame --
>> df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
>> df.show(2,False)
+------+------------------------------------------------------------+
|Name |d |
+------+------------------------------------------------------------+
|Amber |[Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1]|
|Alfred|[Body -> {"Weight": 80, "Height": 176}, BodyType -> 2] |
+------+------------------------------------------------------------+
-- Schema(For common item) --
>> print(json.dumps(schema.jsonValue(), indent=2))
{
"fields": [
{
"metadata": {},
"type": "string",
"name": "Name",
"nullable": false
},
{
"metadata": {},
"type": {
"keyType": "string",
"type": "map",
"valueType": "string",
"valueContainsNull": true
},
"name": "d",
"nullable": false
}
],
"type": "struct"
}
-- Code --
from pyspark.sql.types import *
rdd = sc.parallelize([("Amber", {"Body": "{\"City\": \"Oregon\", \"Country\": \"US\"}", "BodyType": 1}), ("Alfred", {"Body": "{\"Weight\": 80, \"Height\": 176}", "BodyType": 2})])
schema = StructType([StructField('Name',StringType(), False)
,StructField('d',MapType(StringType(),StringType()), False)])
df = spark.createDataFrame(rdd, schema)
-- Output json file --
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}}
-- Output json file(ideal) --
{"Name":"Amber","d":{"Body":"{\"City\": \"Oregon\", \"Country\": \"US\"}","BodyType":"1"}, "Body":{"City": "Oregon", "Country": "US"}}
{"Name":"Alfred","d":{"Body":"{\"Weight\": 80, \"Height\": 176}","BodyType":"2"}, "Body":{"Weight": 80, "Height": 176}}
I attempted to use schema_of_json() and from_json() of pyspark.sql.functions, but it did not work. (schema_of_json can only accept character literals)
-- Trial results --
from pyspark.sql.functions import schema_of_json
from pyspark.sql.functions import from_json
df = df.withColumn('Body', df.select(from_json(df.d.body,schema_of_json(df.d.Body))))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/functions.py", line 2277, in from_json
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of schemaofjson(`d`['Body']);"
The short answer is no, there is no way to dynamically infer the schema on each row and end up with a column where different rows have different schemas.
However, there is a way to output the json string that you want, and reconcile different json into a common, richly-typed schema
If it were allowed it would be excruciatingly slow, but more importantly it is not allowed because it breaks the relational model that allows SparkSQL to operate consistently.
A dataframe consists of columns (fields) and a column has only one datatype; the datatype represents the entire column. It's not strictly enforceable in Pyspark given Python's nature but it is important at run-time so that statement still applies.
In your example if you wanted to project the City
attribute with something like d.Body.City
then that has to exist for both Alfred and Amber. At least, the metadata for that field has to exist even if there is no value. The execution engine needs to know quickly whether the path is invalid or not, to avoid pointless scanning of every row.
Some ways to reconcile multiple types in a single column are ( I'm sure there's more I can't think of):
I like (1) in this case, but (4) could be valid as an interim step to finding a universal schema.
Your example "common" json schema is more like option (3). Inside the map you've called "d" (I guess because it's a dict?) the information about the fields is not available without scanning the data.
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
I realise this is just an interim step towards adding a new column containing the Body
, but to do that you have to enumerate all the possible keys in that map out into a more useful schema.
The universal (common) schema is not a generic map of string -> string
, I think it is more useful like this below. It is close to what you tried originally but not dynamic and is valid for both rows. Notice the nullable
is the default True
for all the attributes
schema_body = StructType([
StructField("City", StringType()),
StructField("Country", StringType()),
StructField("Weight", IntegerType()),
StructField("Height", IntegerType())
])
df = df.withColumn("Body", from_json("d.Body", schema_body))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- d: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
df.show(2, False)
+------+---------------------------------------------------------------+---------------------+
|Name |d |Body |
+------+---------------------------------------------------------------+---------------------+
|Amber |Map(Body -> {"City": "Oregon", "Country": "US"}, BodyType -> 1)|[Oregon,US,null,null]|
|Alfred|Map(Body -> {"Weight": 80, "Height": 176}, BodyType -> 2) |[null,null,80,176] |
+------+---------------------------------------------------------------+---------------------+
Now you can get to the Body.City
easily by selecting d.Body.City
without worrying which rows have City.
For your next step you could get it back to a json string
df = df.withColumn("Body", to_json("d.Body"))
You could also combine it with the previous step
df = df.withColumn("Body", to_json(from_json("d.Body", schema_body)))
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyAttributes: struct (nullable = true)
| |-- Body: string (nullable = true)
| |-- BodyType: integer (nullable = true)
|-- Body: string (nullable = true)
df.show(2, False)
+------+---------------------------------------+--------------------------------+
|Name |BodyAttributes |Body |
+------+---------------------------------------+--------------------------------+
|Amber |[{"City": "Oregon", "Country": "US"},1]|{"City":"Oregon","Country":"US"}|
|Alfred|[{"Weight": 80, "Height": 176},2] |{"Weight":80,"Height":176} |
+------+---------------------------------------+--------------------------------+
Notice when converting it back to a json string those NULL values are gone. It's also now a jsonstring so easy to write to file as you wanted.
If you were doing this as part of a process to make the data accessible for analysis, reporting or other purposes I would do something like this
schema = StructType([
StructField('Name',StringType(), False),
StructField(
'd',
StructType([
StructField("Body", StringType()),
StructField("BodyType", IntegerType())
])
)
])
df = spark.createDataFrame(rdd, schema)
df = df.withColumn(
"Body",
from_json("d.Body", schema_body)
).withColumn(
"BodyType",
col("d.BodyType")
).drop("d")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- Body: struct (nullable = true)
| |-- City: string (nullable = true)
| |-- Country: string (nullable = true)
| |-- Weight: integer (nullable = true)
| |-- Height: integer (nullable = true)
|-- BodyType: integer (nullable = true)
df.show(2, False)
+------+---------------------+--------+
|Name |Body |BodyType|
+------+---------------------+--------+
|Amber |[Oregon,US,null,null]|1 |
|Alfred|[null,null,80,176] |2 |
+------+---------------------+--------+
Then you can select Body.City
, Body.Country
, Body.Weight,
Body.Height`
You could go one more step, but that would really depend on how many of these possible Body keys there are and how sparse it is.
df = df.withColumn(
"City", col("Body.City")
).withColumn(
"Country", col("Body.Country")
).withColumn(
"Weight", col("Body.Weight")
).withColumn(
"Height", col("Body.Height")
).drop("Body")
df.printSchema()
root
|-- Name: string (nullable = false)
|-- BodyType: integer (nullable = true)
|-- City: string (nullable = true)
|-- Country: string (nullable = true)
|-- Weight: integer (nullable = true)
|-- Height: integer (nullable = true)
df.show(2, False)
+------+--------+------+-------+------+------+
|Name |BodyType|City |Country|Weight|Height|
+------+--------+------+-------+------+------+
|Amber |1 |Oregon|US |null |null |
|Alfred|2 |null |null |80 |176 |
+------+--------+------+-------+------+------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With