Retain raw JSON as column in Spark DataFrame on read/load?

I have been looking for a way to add my raw (JSON) data as a column when reading my data into a Spark DataFrame. I have one way to do this with a join but am hoping there is a way to do this in a single operation using Spark 2.2.x+.

So for example data:

{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}
{"team":"Sharks","origin": "San Jose", "eliminated":"true"}

When executing:

val logs = sc.textFile("/Users/vgk/data/tiny.json") // example data file

Predictably we get:

|        colors|eliminated|              origin|          team|
|gold,red,black|      null|           Las Vegas|Golden Knights|
|          null|      true|            San Jose|        Sharks|
|red,green,gold|      null|           Minnesota|          Wild|
|red,white,blue|     false|District of Columbia|      Capitals|

What I'd like to have on initial load is the above, but with the raw JSON data as an additional column. For example (truncated raw values):

|        colors|eliminated|              origin|          team|               value|
|red,white,blue|     false|District of Columbia|      Capitals|{"colors":"red,wh...|
|gold,red,black|      null|           Las Vegas|Golden Knights|{"colors":"gold,r...|
|          null|      true|            San Jose|        Sharks|{"eliminated":"tr...|
|red,green,gold|      null|           Minnesota|          Wild|{"colors":"red,gr...|

A non-ideal solution involves a join:

val logs = sc.textFile("/Users/vgk/data/tiny.json")
val df = spark.read.json(logs).withColumn("uniqueID",monotonically_increasing_id)
val rawdf = df.toJSON.withColumn("uniqueID",monotonically_increasing_id)
df.join(rawdf, "uniqueID")

Which results in the same dataframe as above but with and added uniqueID column. Additionally, the json is rendered from the DF and is not necessarily the "raw" data. In practice they are equal, but for my use case the actual raw data is preferable.

Is anyone aware of a solution that will capture the raw JSON data as an additional column on load?

1 Answers

If you have a schema of the data that you receive, then you can use from_json with schema to get all the fields and keep the raw field as it is

val logs = spark.sparkContext.textFile(path) // example data file

val schema = StructType(
  StructField("team", StringType, true)::
  StructField("colors", StringType, true)::
  StructField("eliminated", StringType, true)::
  StructField("origin", StringType, true)::Nil

    .withColumn("json", from_json($"values", schema))
    .select("values", "json.*")



|values                                                                  |team          |colors        |eliminated|origin   |
|{"team":"Golden Knights","colors":"gold,red,black","origin":"Las Vegas"}|Golden Knights|gold,red,black|null      |Las Vegas|
|{"team":"Sharks","origin": "San Jose", "eliminated":"true"}             |Sharks        |null          |true      |San Jose |
|{"team":"Wild","colors":"red,green,gold","origin":"Minnesota"}          |Wild          |red,green,gold|null      |Minnesota|

Hope his helps!

