Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Accessing nested data in spark

I have a collection of nested case classes. I've got a job that generates a dataset using these case classes, and writes the output to parquet.

I was pretty annoyed to discover that I have to manually do a load of faffing around to load and convert this data back to case classes to work with it in subsequent jobs. Anyway, that's what I'm now trying to do.

My case classes are like:

case class Person(userId: String, tech: Option[Tech])
case class Tech(browsers: Seq[Browser], platforms: Seq[Platform])
case class Browser(family: String, version: Int)

So I'm loading my parquet data. I can get the tech data as a Row with:

val df = sqlContext.load("part-r-00716.gz.parquet")
val x = df.head
val tech = x.getStruct(x.fieldIndex("tech"))

But now I can't find how to actually iterate over the browsers. If I try val browsers = tech.getStruct(tech.fieldIndex("browsers")) I get an exception:

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to org.apache.spark.sql.Row

How can I iterate over my nested browser data using spark 1.5.2?

Update In fact, my case classes contain optional values, so Browser actually is:

case class Browser(family: String,
               major: Option[String] = None, 
               minor: Option[String] = None,
               patch: Option[String] = None, 
               language: String,
               timesSeen: Long = 1,
               firstSeenAt: Long,
               lastSeenAt: Long)

I also have similar for Os:

case class Os(family: String,
          major: Option[String] = None,
          minor: Option[String] = None,
          patch: Option[String] = None,
          patchMinor: Option[String],
          override val timesSeen: Long = 1,
          override val firstSeenAt: Long,
          override val lastSeenAt: Long)

And so Tech is really:

case class Technographic(browsers: Seq[Browser], 
                     devices: Seq[Device],
                     oss: Seq[Os])

Now, given the fact that some values are optional, I need a solution that will allow me to reconstruct my case classes correctly. The current solution doesn't support None values, so for example given the input data:

Tech(browsers=Seq(
    Browser(family=Some("IE"), major=Some(7), language=Some("en"), timesSeen=3),
    Browser(family=None, major=None, language=Some("en-us"), timesSeen=1),
    Browser(family=Some("Firefox), major=None, language=None, timesSeen=1)
  )
)

I need it to load the data as follows:

family=IE, major=7, language=en, timesSeen=3,
family=None, major=None, language=en-us, timesSeen=1,
family=Firefox, major=None, language=None, timesSeen=1

Because the current solution doesn't support None values, it in fact has an arbitrary number of values per list item, i.e.:

browsers.family = ["IE", "Firefox"]
browsers.major = [7]
browsers.language = ["en", "en-us"]
timesSeen = [3, 1, 1]

As you can see, there's no way of converting the final data (returned by spark) into the case classes that generated it.

How can I work around this insanity?

like image 392
jbrown Avatar asked Dec 02 '15 12:12

jbrown


1 Answers

Some examples

// Select two columns
df.select("userId", "tech.browsers").show()

// Select the nested values only
df.select("tech.browsers").show(truncate = false)
+-------------------------+
|browsers                 |
+-------------------------+
|[[Firefox,4], [Chrome,2]]|
|[[Firefox,4], [Chrome,2]]|
|[[IE,25]]                |
|[]                       |
|null                     |
+-------------------------+

// Extract the family (nested value)
// This way you can iterate over the persons, and get their browsers
// Family values are nested
df.select("tech.browsers.family").show()
+-----------------+
|           family|
+-----------------+
|[Firefox, Chrome]|
|[Firefox, Chrome]|
|             [IE]|
|               []|
|             null|
+-----------------+

// Normalize the family: One row for each family
// Then you can iterate over all families
// Family values are un-nested, empty values/null/None are handled by explode()
df.select(explode(col("tech.browsers.family")).alias("family")).show()
+-------+
| family|
+-------+
|Firefox|
| Chrome|
|Firefox|
| Chrome|
|     IE|
+-------+

Based on the last example:

val families = df.select(explode(col("tech.browsers.family")))
  .map(r => r.getString(0)).distinct().collect().toList
println(families)

gives the unique list of browers in a "normal" local Scala list:

List(IE, Firefox, Chrome)

like image 122
Beryllium Avatar answered Sep 22 '22 11:09

Beryllium