Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I flatMap a row of arrays into multiple rows?

After parsing some jsons I have a one-column DataFrame of arrays

scala> val jj =sqlContext.jsonFile("/home/aahu/jj2.json")
res68: org.apache.spark.sql.DataFrame = [r: array<bigint>]
scala> jj.first()
res69: org.apache.spark.sql.Row = [List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)]

I'd like to explode each row out into several rows. How?

edit:

Original json file:

{"r": [0,1,2,3,4,5,6,7,8,9]}
{"r": [0,1,2,3,4,5,6,7,8,9]}

I want an RDD or a DataFrame with 20 rows.

I can't simply use flatMap here - I'm not sure what the appropriate command in spark is:

scala> jj.flatMap(r => r)
<console>:22: error: type mismatch;
 found   : org.apache.spark.sql.Row
 required: TraversableOnce[?]
              jj.flatMap(r => r)
like image 732
dranxo Avatar asked Jun 14 '15 19:06

dranxo


3 Answers

You can use DataFrame.explode to achieve what you desire. Below is what I tried in spark-shell with your sample json data.

import scala.collection.mutable.ArrayBuffer
val jj1 = jj.explode("r", "r1") {list : ArrayBuffer[Long] => list.toList }
val jj2 = jj1.select($"r1")
jj2.collect

You can refer to API documentation to understand more DataFrame.explode

like image 77
Wesley Miao Avatar answered Nov 11 '22 05:11

Wesley Miao


I've tested this with Spark 1.3.1 Or you can use Row.getAs function:

import scala.collection.mutable.ArrayBuffer
val elementsRdd = jj.select(jj("r")).map(t=>t.getAs[ArrayBuffer[Long]](0)).flatMap(x=>x)
elementsRdd.count()
>>>Long = 20
elementsRdd.take(5)
>>>Array[Long] = Array(0, 1, 2, 3, 4)
like image 3
vvladymyrov Avatar answered Nov 11 '22 06:11

vvladymyrov


In Spark 1.3+ you can use explode function directly on the column of interest:

import org.apache.spark.sql.functions.explode

jj.select(explode($"r"))
like image 2
zero323 Avatar answered Nov 11 '22 05:11

zero323