Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read a nested collection in Spark

I have a parquet table with one of the columns being

, array<struct<col1,col2,..colN>>

Can run queries against this table in Hive using LATERAL VIEW syntax.

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

Could not find any references to this in Spark documentation. Thanks in advance for any information!

ps. I felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

like image 617
Tagar Avatar asked May 02 '15 22:05

Tagar


2 Answers

There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

Reading such nested collection from Parquet files can be tricky, though.

Let's take an example from the spark-shell (1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

Write the parquet file:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

Read the parquet file:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.

dataFrame.select('col1, 'col2).map { row => ... }
like image 110
Lomig Mégard Avatar answered Sep 23 '22 08:09

Lomig Mégard


I'll give a Python-based answer since that's what I'm using. I think Scala has something similar.

The explode function was added in Spark 1.4.0 to handle nested arrays in DataFrames, according to the Python API docs.

Create a test dataframe:

from pyspark.sql import Row

df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
df.show()

## +-+--------------------+
## |a|             intlist|
## +-+--------------------+
## |1|ArrayBuffer(1, 2, 3)|
## |2|ArrayBuffer(4, 5, 6)|
## +-+--------------------+

Use explode to flatten the list column:

from pyspark.sql.functions import explode

df.select(df.a, explode(df.intlist)).show()

## +-+---+
## |a|_c0|
## +-+---+
## |1|  1|
## |1|  2|
## |1|  3|
## |2|  4|
## |2|  5|
## |2|  6|
## +-+---+
like image 28
dnlbrky Avatar answered Sep 26 '22 08:09

dnlbrky