Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

write spark dataframe as array of json (pyspark)

I would like to write my spark dataframe as a set of JSON files and in particular each of which as an array of JSON. Let's me explain with a simple (reproducible) code.

We have:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

Saving the dataframe as:

df.write.json('s3://path/to/json')

each file just created has one JSON object per line, something like:

{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

but I would like to have an array of those JSON per file:

[
   {"x":0.9953802385540144,"y":0.476027611419198},
   {"x":0.929599290575914,"y":0.72878523939521},
   {"x":0.951701684432855,"y":0.8008064729546504}
]
like image 320
enneppi Avatar asked Oct 04 '19 14:10

enneppi


1 Answers

It is not currently possible to have spark "natively" write a single file in your desired format because spark works in a distributed (parallel) fashion, with each executor writing its part of the data independently.

However, since you are okay with having each file be an array of json not only [one] file, here is one workaround that you can use to achieve your desired output:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("json_list"))\
    .select(col("json_list").cast("string"))\
    .write.text("s3://path/to/json")

First you create a json from all of the columns in df. Then group by the spark partition ID and aggregate using collect_list. This will put all the jsons on that partition into a list. Since you're aggregating within the partition, there should be no shuffling of data required.

Now select the list column, convert to a string, and write it as a text file.

Here's an example of how one file looks:

[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

Note you may get some empty files.


Presumably you can force spark to write the data in ONE file if you specified an empty groupBy, but this would result in forcing all of the data into a single partition which could result in an out of memory error.

like image 117
pault Avatar answered Sep 22 '22 13:09

pault