I have the following JSON loaded as a DataFrame:
root
 |-- data: struct (nullable = true)
 |    |-- field1: string (nullable = true)
 |    |-- field2: string (nullable = true)
 |-- moreData: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- more1: string (nullable = true)
 |    |    |-- more2: string (nullable = true)
 |    |    |-- more3: string (nullable = true)
I want to get the following RDD from this DataFrame:
RDD[(more1, more2, more3, field1, field2)]
How can I do this? I think I have to use flatMap for the nested JSON?
A combination of explode and dot syntax should do the trick:
import org.apache.spark.sql.functions.explode
case class Data(field1: String, field2: String)
case class MoreData(more1: String, more2: String, more3: String)
val df = sc.parallelize(Seq(
  (Data("foo", "bar"), Array(MoreData("a", "b", "c"), MoreData("d", "e", "f")))
)).toDF("data", "moreData")
df.printSchema
// root
//  |-- data: struct (nullable = true)
//  |    |-- field1: string (nullable = true)
//  |    |-- field2: string (nullable = true)
//  |-- moreData: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- more1: string (nullable = true)
//  |    |    |-- more2: string (nullable = true)
//  |    |    |-- more3: string (nullable = true)
val columns = Seq(
  $"moreData.more1", $"moreData.more2", $"moreData.more3",
  $"data.field1", $"data.field2")
val aRDD = df.withColumn("moreData", explode($"moreData"))
  .select(columns: _*)
  .rdd
aRDD.collect
// Array[org.apache.spark.sql.Row] = Array([a,b,c,foo,bar], [d,e,f,foo,bar])
Depending on your requirements you can follow this with map to extract values from the rows:
import org.apache.spark.sql.Row
aRDD.map{case Row(m1: String, m2: String, m3: String, f1: String, f2: String) =>
  (m1, m2, m3, f1, f2)}
See also Querying Spark SQL DataFrame with complex types
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With