Lets say i loaded a json file into Spark 1.6 via
sqlContext.read.json("/hdfs/")
it gives me a Dataframe with following schema:
root
|-- id: array (nullable = true)
| |-- element: string (containsNull = true)
|-- checked: array (nullable = true)
| |-- element: string (containsNull = true)
|-- color: array (nullable = true)
| |-- element: string (containsNull = true)
|-- type: array (nullable = true)
| |-- element: string (containsNull = true)
The DF has only one row with an Array of all my Items inside.
+--------------------+--------------------+--------------------+
| id_e| checked_e| color_e|
+--------------------+--------------------+--------------------+
|[0218797c-77a6-45...|[false, true, tru...|[null, null, null...|
+--------------------+--------------------+--------------------+
I want to have a DF with the arrays exploded into one item per line.
+--------------------+-----+-------+
| id|color|checked|
+--------------------+-----+-------+
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
|0218797c-77a6-45f...| null| false|
...
So far i achieved this by creating a temporary table from the array DF and used sql with lateral view explode on these lines.
val results = sqlContext.sql("
SELECT id, color, checked from temptable
lateral view explode(checked_e) temptable as checked
lateral view explode(id_e) temptable as id
lateral view explode(color_e) temptable as color
")
Is there any way to achieve this directly with Dataframe functions without using SQL? I know there is something like df.explode(...) but i could not get it to work with my Data
EDIT: It seems the explode isnt what i really wanted in the first place. I want a new dataframe that has each item of the arrays line by line. The explode function actually gives back way more lines than my initial dataset has.
The following solution should work.
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val df = sqlContext.createDataFrame(data)
val udf3 = udf[Seq[(Int, Int, Int)], Seq[Int], Seq[Int], Seq[Int]]{
case (a, b, c) => (a,b, c).zipped.toSeq
}
val df3 = df.select(udf3($"_1", $"_2", $"_3").alias("udf3"))
val exploded = df3.select(explode($"udf3").alias("col3"))
exploded.withColumn("first", $"col3".getItem("_1"))
.withColumn("second", $"col3".getItem("_2"))
.withColumn("third", $"col3".getItem("_3")).show
While it is more straightforward if using normal Scala code directly. It might be more efficient too. Spark could not help anyway if there is only one row.
val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val seqExploded = data.flatMap{
case (a: Seq[Int], b: Seq[Int], c: Seq[Int]) => (a, b, c).zipped.toSeq
}
val dfTheSame=sqlContext.createDataFrame(seqExploded)
dfTheSame.show
It should be simple like this:
df.withColumn("id", explode(col("id_e")))
.withColumn("checked", explode(col("checked_e")))
.withColumn("color", explode(col("color_e")))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With