Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Retain raw JSON as column in Spark DataFrame on read/load?

I have been looking for a way to add my raw (JSON) data as a column when reading my data into a Spark DataFrame. I have one way to do this with a join but am hoping there is a way to do this in a single operation using Spark 2.2.x+.

So for example data:

{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}
{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}

When executing:

val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file
spark.read.json(logs).show

Predictably we get:

+--------------+----------+--------------------+--------------+
|        colors|eliminated|              origin|          team|
+--------------+----------+--------------------+--------------+
|gold,red,black|      null|           Las Vegas|Golden Knights|
|          null|      true|            San Jose|        Sharks|
|red,green,gold|      null|           Minnesota|          Wild|
|red,white,blue|     false|District of Columbia|      Capitals|
+--------------+----------+--------------------+--------------+

What I'd like to have on initial load is the above, but with the raw JSON data as an additional column. For example (truncated raw values):

+--------------+-------------------------------+--------------+--------------------+
|        colors|eliminated|              origin|          team|               value|
+--------------+----------+--------------------+--------------+--------------------+
|red,white,blue|     false|District of Columbia|      Capitals|{"colors":"red,wh...|
|gold,red,black|      null|           Las Vegas|Golden Knights|{"colors":"gold,r...|
|          null|      true|            San Jose|        Sharks|{"eliminated":"tr...|
|red,green,gold|      null|           Minnesota|          Wild|{"colors":"red,gr...|
+--------------+----------+--------------------+--------------+--------------------+

A non-ideal solution involves a join:

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs).withColumn("uniqueID",monotonically_increasing_id)
val rawdf = df.toJSON.withColumn("uniqueID",monotonically_increasing_id)
df.join(rawdf, "uniqueID")

Which results in the same dataframe as above but with and added uniqueID column. Additionally, the json is rendered from the DF and is not necessarily the "raw" data. In practice they are equal, but for my use case the actual raw data is preferable.

Is anyone aware of a solution that will capture the raw JSON data as an additional column on load?

like image 378
reverend Avatar asked May 07 '18 15:05

reverend


People also ask

How do I read a JSON file in Spark?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using the read. json() function, which loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file that is offered as a json file is not a typical JSON file.

How do I read and write JSON files in Pyspark?

json is read using the spark. read. json("path") function. The "multiline_dataframe" value is created for reading records from JSON files that are scattered in multiple lines so, to read such files, use-value true to multiline option and by default multiline option is set to false.

How do you read the JSON file using the SQLContext created?

Spark SQL can automatically capture the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using SQLContext. read. json() on either an RDD of String or a JSON file.


1 Answers

If you have a schema of the data that you receive, then you can use from_json with schema to get all the fields and keep the raw field as it is

val logs = spark.sparkContext.textFile(path) // example data file

val schema = StructType(
  StructField("team", StringType, true)::
  StructField("colors", StringType, true)::
  StructField("eliminated", StringType, true)::
  StructField("origin", StringType, true)::Nil
)

logs.toDF("values")
    .withColumn("json", from_json($"values", schema))
    .select("values", "json.*")

    .show(false)

Output:

+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|values                                                                  |team          |colors        |eliminated|origin   |
+------------------------------------------------------------------------+--------------+--------------+----------+---------+
|{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}|Golden Knights|gold,red,black|null      |Las Vegas|
|{"team":"Sharks","origin": "San Jose", "eliminated":"true"}             |Sharks        |null          |true      |San Jose |
|{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}          |Wild          |red,green,gold|null      |Minnesota|
+------------------------------------------------------------------------+--------------+--------------+----------+---------+

Hope his helps!

like image 124
koiralo Avatar answered Nov 11 '22 18:11

koiralo