Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

dataframe Spark scala explode json array

Let's say I have a dataframe which looks like this:

+--------------------+--------------------+--------------------------------------------------------------+
|                id  |           Name     |                                                       Payment|
+--------------------+--------------------+--------------------------------------------------------------+
|                1   |           James    |[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|
+--------------------+--------------------+--------------------------------------------------------------+

And the schema is:

root

|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

How can I explode the above JSON array into below:

+--------------------+--------------------+-------------------------------+
|                id  |           Name     |                        Payment|
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   {"@id":1, "currency":"GBP"} |
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   {"@id":2, "currency":"USD"} |
+--------------------+--------------------+-------------------------------+

I've been trying to use the explode functionality like the below, but it's not working. It's giving an error about not being able to explode string types, and that it expects either a map or array. This makes sense given the schema denotes it's a string, rather than an array/map, but I'm not sure how to convert this into an appropriate format.

val newDF = dataframe.withColumn("nestedPayment", explode(dataframe.col("Payment")))

Any help is greatly appreciated!

like image 344
Richard Avatar asked Mar 16 '17 19:03

Richard


People also ask

How do you explode an array in Spark?

Spark function explode(e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.

What does explode () do on a JSON field?

The explode() function breaks a string into an array.

What is explode in Scala?

explode (col)[source] Returns a new row for each element in the given array or map. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise.


3 Answers

My solution is wrap your json array string into a json string to use from_json function with struct type of array of string

val dataframe = spark.sparkContext.parallelize(Seq(("1", "James", "[ {\"@id\": 1, \"currency\":\"GBP\"},{\"@id\": 2, \"currency\": \"USD\"} ]"))).toDF("id", "Name", "Payment")
val result = dataframe.withColumn("wrapped_json", concat_ws("", lit("{\"array\":"), col("Payment"), lit("}")))
    .withColumn("array_json", from_json(col("wrapped_json"), StructType(Seq(StructField("array", ArrayType(StringType))))))
    .withColumn("result", explode(col("array_json.array")))

Result:

+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+
|id |Name |Payment                                                       |wrapped_json                                                            |array_json                                                |result                    |
+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+
|1  |James|[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|{"array":[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]}|[[{"@id":1,"currency":"GBP"}, {"@id":2,"currency":"USD"}]]|{"@id":1,"currency":"GBP"}|
|1  |James|[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]|{"array":[ {"@id": 1, "currency":"GBP"},{"@id": 2, "currency": "USD"} ]}|[[{"@id":1,"currency":"GBP"}, {"@id":2,"currency":"USD"}]]|{"@id":2,"currency":"USD"}|
+---+-----+--------------------------------------------------------------+------------------------------------------------------------------------+----------------------------------------------------------+--------------------------+

I am using spark 2.3.2 and Kudakwashe Nyatsanza's solution not work for me, It throw org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(value)' due to data type mismatch: Input schema array<string> must be a struct or an array of structs.

like image 196
Apollo Avatar answered Sep 18 '22 15:09

Apollo


import org.apache.spark.sql.types._

val newDF = dataframe.withColumn("Payment", 
explode(
from_json(
  get_json_object($"Payment", "$."),ArrayType(StringType)
)))
like image 42
Kudakwashe Nyatsanza Avatar answered Sep 19 '22 15:09

Kudakwashe Nyatsanza


You'll have to parse the JSON string into an array of JSONs, and then use explode on the result (explode expects an array).

To do that (assuming Spark 2.0.*):

  • If you know all Payment values contain a json representing an array with the same size (e.g. 2 in this case), you can hard-code extraction of the first and second elements, wrap them in an array and explode:

    val newDF = dataframe.withColumn("Payment", explode(array(
      get_json_object($"Payment", "$[0]"),
      get_json_object($"Payment", "$[1]")
    )))
    
  • If you can't guarantee all records have a JSON with a 2-element array, but you can guarantee a maximum length of these arrays, you can use this trick to parse elements up to the maximum size and then filter out the resulting nulls:

    val maxJsonParts = 3 // whatever that number is...
    val jsonElements = (0 until maxJsonParts)
                         .map(i => get_json_object($"Payment", s"$$[$i]"))
    
    val newDF = dataframe
      .withColumn("Payment", explode(array(jsonElements: _*)))
      .where(!isnull($"Payment")) 
    
like image 20
Tzach Zohar Avatar answered Sep 20 '22 15:09

Tzach Zohar