I am using the new Apache Spark version 1.4.0 Data-frames API to extract information from Twitter's Status JSON, mostly focused on the Entities Object - the relevant part to this question is showed below:
{
...
...
"entities": {
"hashtags": [],
"trends": [],
"urls": [],
"user_mentions": [
{
"screen_name": "linobocchini",
"name": "Lino Bocchini",
"id": 187356243,
"id_str": "187356243",
"indices": [ 3, 16 ]
},
{
"screen_name": "jeanwyllys_real",
"name": "Jean Wyllys",
"id": 111123176,
"id_str": "111123176",
"indices": [ 79, 95 ]
}
],
"symbols": []
},
...
...
}
There are several examples on how extract information from primitives types as string
, integer
, etc - but I couldn't find anything on how to process those kind of complex structures.
I tried the code below but it is still doesn't work, it throws an Exception
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val tweets = sqlContext.read.json("tweets.json")
// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)
import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String)
val mentions = tweets.select("entities.user_mentions").
filter(!isEmpty($"user_mentions")).
explode($"user_mentions") {
case Row(arr: Array[Row]) => arr.map { elem =>
UserMention(
elem.getAs[Long]("id"),
elem.getAs[String]("is_str"),
elem.getAs[Array[Long]]("indices"),
elem.getAs[String]("name"),
elem.getAs[String]("screen_name"))
}
}
mentions.first
Exception when I try to call mentions.first
:
scala> mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
As additional context, the structure mapped automatically is:
scala> mentions.printSchema
root
|-- user_mentions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- id_str: string (nullable = true)
| | |-- indices: array (nullable = true)
| | | |-- element: long (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- screen_name: string (nullable = true)
NOTE 1: I know it is possible to solve this using HiveQL
but I would like to use Data-frames once there is so much momentum around it.
SELECT explode(entities.user_mentions) as mentions
FROM tweets
NOTE 2: the UDF val isEmpty = udf((value: List[Any]) => value.isEmpty)
is a ugly hack and I'm missing something here, but was the only way I came up to avoid a NPE
Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.
Spark – explode Array of Struct to rows Problem: How to explode Array of StructType DataFrame columns to rows using Spark. Solution: Spark explode function can be used to explode an Array of Struct ArrayType(StructType) columns to rows on Spark DataFrame using scala example.
To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.
here is a solution that works, with just one small hack.
The main idea is to work around the type problem by declaring a List[String] rather than List[Row]:
val mentions = tweets.explode("entities.user_mentions", "mention"){m: List[String] => m}
This creates a second column called "mention" of type "Struct":
| entities| mention|
+--------------------+--------------------+
|[List(),List(),Li...|[187356243,187356...|
|[List(),List(),Li...|[111123176,111123...|
Now do a map() to extract the fields inside mention. The getStruct(1) call gets the value in column 1 of each row:
case class Mention(id: Long, id_str: String, indices: Seq[Int], name: String, screen_name: String)
val mentionsRdd = mentions.map(
row =>
{
val mention = row.getStruct(1)
Mention(mention.getLong(0), mention.getString(1), mention.getSeq[Int](2), mention.getString(3), mention.getString(4))
}
)
And convert the RDD back into a DataFrame:
val mentionsDf = mentionsRdd.toDF()
There you go!
| id| id_str| indices| name| screen_name|
+---------+---------+------------+-------------+---------------+
|187356243|187356243| List(3, 16)|Lino Bocchini| linobocchini|
|111123176|111123176|List(79, 95)| Jean Wyllys|jeanwyllys_real|
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With