Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark explode nested JSON with Array in Scala

Lets say i loaded a json file into Spark 1.6 via

sqlContext.read.json("/hdfs/")

it gives me a Dataframe with following schema:

root
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checked: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- color: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)

The DF has only one row with an Array of all my Items inside.

+--------------------+--------------------+--------------------+
|                id_e|           checked_e|             color_e|
+--------------------+--------------------+--------------------+
|[0218797c-77a6-45...|[false, true, tru...|[null, null, null...|
+--------------------+--------------------+--------------------+

I want to have a DF with the arrays exploded into one item per line.

+--------------------+-----+-------+
|                  id|color|checked|
+--------------------+-----+-------+
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
...

So far i achieved this by creating a temporary table from the array DF and used sql with lateral view explode on these lines.

val results = sqlContext.sql("
SELECT id, color, checked from temptable 
lateral view explode(checked_e) temptable as checked 
lateral view explode(id_e) temptable as id 
lateral view explode(color_e) temptable as color
")

Is there any way to achieve this directly with Dataframe functions without using SQL? I know there is something like df.explode(...) but i could not get it to work with my Data

EDIT: It seems the explode isnt what i really wanted in the first place. I want a new dataframe that has each item of the arrays line by line. The explode function actually gives back way more lines than my initial dataset has.

like image 999
user3780814 Avatar asked Jul 07 '16 10:07

user3780814


2 Answers

The following solution should work.

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val df = sqlContext.createDataFrame(data)

val udf3 = udf[Seq[(Int, Int, Int)], Seq[Int], Seq[Int], Seq[Int]]{
    case (a, b, c) => (a,b, c).zipped.toSeq
}

val df3 = df.select(udf3($"_1", $"_2", $"_3").alias("udf3"))
val exploded = df3.select(explode($"udf3").alias("col3"))

exploded.withColumn("first", $"col3".getItem("_1"))
    .withColumn("second", $"col3".getItem("_2"))
    .withColumn("third", $"col3".getItem("_3")).show

While it is more straightforward if using normal Scala code directly. It might be more efficient too. Spark could not help anyway if there is only one row.

val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val seqExploded = data.flatMap{
    case (a: Seq[Int], b: Seq[Int], c: Seq[Int]) => (a, b, c).zipped.toSeq
}
val dfTheSame=sqlContext.createDataFrame(seqExploded)
dfTheSame.show
like image 118
Rockie Yang Avatar answered Oct 20 '22 20:10

Rockie Yang


It should be simple like this:

df.withColumn("id", explode(col("id_e")))
  .withColumn("checked", explode(col("checked_e")))
  .withColumn("color", explode(col("color_e")))
like image 32
Vitalii Kotliarenko Avatar answered Oct 20 '22 19:10

Vitalii Kotliarenko