I am trying to generate a json string from a nested pyspark DataFrame, but am losing key values. My initial Dataset is similar to the following:
data = [
{"foo": [1, 2], "bar": [4, 5], "buzz": [7, 8]},
{"foo": [1], "bar": [4], "buzz": [7]},
{"foo": [1, 2, 3], "bar": [4, 5, 6], "buzz": [7, 8, 9]},
]
df = spark.read.json(sc.parallelize(data))
df.show()
## +---------+---------+---------+
## | bar| buzz| foo|
## +---------+---------+---------+
## | [4, 5]| [7, 8]| [1, 2]|
## | [4]| [7]| [1]|
## |[4, 5, 6]|[7, 8, 9]|[1, 2, 3]|
## +---------+---------+---------+
I then zip each of the columns together using arrays_zip:
df_zipped = (
df
.withColumn(
"zipped",
F.arrays_zip(
F.col("foo"),
F.col("bar"),
F.col("buzz"),
)
)
)
df_zipped.printSchema()
root
|-- bar: array (nullable = true)
| |-- element: long (containsNull = true)
|-- buzz: array (nullable = true)
| |-- element: long (containsNull = true)
|-- foo: array (nullable = true)
| |-- element: long (containsNull = true)
|-- zipped: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- foo: long (nullable = true)
| | |-- bar: long (nullable = true)
| | |-- buzz: long (nullable = true)
The problem is using to_json on the zipped array. It loses the foo, bar, and buzz key values, and instead saves the keys as the element index
(
df_zipped
.withColumn("zipped", F.to_json("zipped"))
.select("zipped")
.show(truncate=False)
)
+-------------------------------------------------------------+
|zipped |
+-------------------------------------------------------------+
|[{"0":1,"1":4,"2":7},{"0":2,"1":5,"2":8}] |
|[{"0":1,"1":4,"2":7}] |
|[{"0":1,"1":4,"2":7},{"0":2,"1":5,"2":8},{"0":3,"1":6,"2":9}]|
+-------------------------------------------------------------+
How do I keep "bar", "buzz", and "foo" instead of the 0, 1, 2?
Manually specifying the schema works too: For the foo, bar, and buzz fields, the array on top of the elements must have been named, not at the actual datafields themselves
data = [
{"foo": [1, 2], "bar": [4, 5], "buzz": [7, 8]},
{"foo": [1], "bar": [4], "buzz": [7]},
{"foo": [1, 2, 3], "bar": [4, 5, 6], "buzz": [7, 8, 9]},
]
df = spark.read.json(sc.parallelize(data))
df.show()
+---------+---------+---------+
| bar| buzz| foo|
+---------+---------+---------+
| [4, 5]| [7, 8]| [1, 2]|
| [4]| [7]| [1]|
|[4, 5, 6]|[7, 8, 9]|[1, 2, 3]|
+---------+---------+---------+
Then defining and casting to the schema manually:
schema = StructType([
StructField("foo", IntegerType()),
StructField("bar", IntegerType()),
StructField("buzz", IntegerType()),
])
df_zipped = (
df_test
.select(
F.arrays_zip(
F.col("foo"),
F.col("bar"),
F.col("buzz"),
)
.alias("zipped")
)
.filter(F.col("zipped").isNotNull())
.select(F.col("zipped").cast(ArrayType(schema)))
)
This results in the desired solution:
(
df_zipped
.withColumn("zipped", F.to_json("zipped"))
.select("zipped")
.show(truncate=False)
)
+----------------------------------------------------------------------------------+
|zipped |
+----------------------------------------------------------------------------------+
|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8}] |
|[{"foo":1,"bar":4,"buzz":7}] |
|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8},{"foo":3,"bar":6,"buzz":9}]|
+----------------------------------------------------------------------------------+
note: Casting as LongType in the schema does not work
This is not a super pretty answer (because you have to specify the keys explicitly), but better than what I put in the comments.
Use transform
with map
:
df_zipped.withColumn(
"zipped",
F.to_json(
F.expr(
"""transform(zipped, x -> map('foo', x['foo'], 'bar', x['bar'], 'buzz', x['buzz']))"""
)
)
).select('zipped').show(truncate=False)
#+----------------------------------------------------------------------------------+
#|zipped |
#+----------------------------------------------------------------------------------+
#|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8}] |
#|[{"foo":1,"bar":4,"buzz":7}] |
#|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8},{"foo":3,"bar":6,"buzz":9}]|
#+----------------------------------------------------------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With