I would like to write my spark dataframe as a set of JSON files and in particular each of which as an array of JSON. Let's me explain with a simple (reproducible) code.
We have:
import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))
Saving the dataframe as:
df.write.json('s3://path/to/json')
each file just created has one JSON object per line, something like:
{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}
but I would like to have an array of those JSON per file:
[
{"x":0.9953802385540144,"y":0.476027611419198},
{"x":0.929599290575914,"y":0.72878523939521},
{"x":0.951701684432855,"y":0.8008064729546504}
]
It is not currently possible to have spark "natively" write a single file in your desired format because spark works in a distributed (parallel) fashion, with each executor writing its part of the data independently.
However, since you are okay with having each file be an array of json not only [one] file, here is one workaround that you can use to achieve your desired output:
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
df.select(to_json(struct(*df.columns)).alias("json"))\
.groupBy(spark_partition_id())\
.agg(collect_list("json").alias("json_list"))\
.select(col("json_list").cast("string"))\
.write.text("s3://path/to/json")
First you create a json
from all of the columns in df
. Then group by the spark partition ID and aggregate using collect_list
. This will put all the json
s on that partition into a list. Since you're aggregating within the partition, there should be no shuffling of data required.
Now select the list column, convert to a string, and write it as a text file.
Here's an example of how one file looks:
[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]
Note you may get some empty files.
Presumably you can force spark to write the data in ONE file if you specified an empty groupBy
, but this would result in forcing all of the data into a single partition which could result in an out of memory error.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With