Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark to_json loses column name of struct inside array

I am trying to generate a json string from a nested pyspark DataFrame, but am losing key values. My initial Dataset is similar to the following:

data = [
    {"foo": [1, 2], "bar": [4, 5], "buzz": [7, 8]},
    {"foo": [1], "bar": [4], "buzz": [7]},
    {"foo": [1, 2, 3], "bar": [4, 5, 6], "buzz": [7, 8, 9]},
]
df = spark.read.json(sc.parallelize(data))
df.show()
## +---------+---------+---------+
## |      bar|     buzz|      foo|
## +---------+---------+---------+
## |   [4, 5]|   [7, 8]|   [1, 2]|
## |      [4]|      [7]|      [1]|
## |[4, 5, 6]|[7, 8, 9]|[1, 2, 3]|
## +---------+---------+---------+

I then zip each of the columns together using arrays_zip:

df_zipped = (
    df
    .withColumn(
        "zipped",
        F.arrays_zip(
            F.col("foo"),
            F.col("bar"),
            F.col("buzz"),
        )
    )
)
df_zipped.printSchema()
root
 |-- bar: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- buzz: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- foo: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- zipped: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- foo: long (nullable = true)
 |    |    |-- bar: long (nullable = true)
 |    |    |-- buzz: long (nullable = true)

The problem is using to_json on the zipped array. It loses the foo, bar, and buzz key values, and instead saves the keys as the element index

(
    df_zipped
    .withColumn("zipped", F.to_json("zipped"))
    .select("zipped")
    .show(truncate=False)
)
+-------------------------------------------------------------+
|zipped                                                       |
+-------------------------------------------------------------+
|[{"0":1,"1":4,"2":7},{"0":2,"1":5,"2":8}]                    |
|[{"0":1,"1":4,"2":7}]                                        |
|[{"0":1,"1":4,"2":7},{"0":2,"1":5,"2":8},{"0":3,"1":6,"2":9}]|
+-------------------------------------------------------------+

How do I keep "bar", "buzz", and "foo" instead of the 0, 1, 2?

like image 294
Justin Davis Avatar asked Oct 18 '25 15:10

Justin Davis


2 Answers

Manually specifying the schema works too: For the foo, bar, and buzz fields, the array on top of the elements must have been named, not at the actual datafields themselves

data = [
    {"foo": [1, 2], "bar": [4, 5], "buzz": [7, 8]},
    {"foo": [1], "bar": [4], "buzz": [7]},
    {"foo": [1, 2, 3], "bar": [4, 5, 6], "buzz": [7, 8, 9]},
]
df = spark.read.json(sc.parallelize(data))
df.show()
+---------+---------+---------+
|      bar|     buzz|      foo|
+---------+---------+---------+
|   [4, 5]|   [7, 8]|   [1, 2]|
|      [4]|      [7]|      [1]|
|[4, 5, 6]|[7, 8, 9]|[1, 2, 3]|
+---------+---------+---------+

Then defining and casting to the schema manually:

schema = StructType([
    StructField("foo", IntegerType()),
    StructField("bar", IntegerType()),
    StructField("buzz", IntegerType()),
])

df_zipped = (
    df_test
    .select(
        F.arrays_zip(
            F.col("foo"), 
            F.col("bar"), 
            F.col("buzz"),
                )
        .alias("zipped")
            )
    .filter(F.col("zipped").isNotNull())
    .select(F.col("zipped").cast(ArrayType(schema)))
)

This results in the desired solution:

(
    df_zipped
    .withColumn("zipped", F.to_json("zipped"))
    .select("zipped")
    .show(truncate=False)
)
+----------------------------------------------------------------------------------+
|zipped                                                                            |
+----------------------------------------------------------------------------------+
|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8}]                           |
|[{"foo":1,"bar":4,"buzz":7}]                                                      |
|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8},{"foo":3,"bar":6,"buzz":9}]|
+----------------------------------------------------------------------------------+

note: Casting as LongType in the schema does not work

like image 96
Justin Davis Avatar answered Oct 21 '25 04:10

Justin Davis


This is not a super pretty answer (because you have to specify the keys explicitly), but better than what I put in the comments.

Use transform with map:

df_zipped.withColumn(
    "zipped", 
    F.to_json(
        F.expr(
            """transform(zipped, x -> map('foo', x['foo'], 'bar', x['bar'], 'buzz', x['buzz']))"""
        )
    )
).select('zipped').show(truncate=False)
#+----------------------------------------------------------------------------------+
#|zipped                                                                            |
#+----------------------------------------------------------------------------------+
#|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8}]                           |
#|[{"foo":1,"bar":4,"buzz":7}]                                                      |
#|[{"foo":1,"bar":4,"buzz":7},{"foo":2,"bar":5,"buzz":8},{"foo":3,"bar":6,"buzz":9}]|
#+----------------------------------------------------------------------------------+
like image 43
pault Avatar answered Oct 21 '25 06:10

pault