Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to extract complex JSON structures using Apache Spark 1.4.0 Data Frames

I am using the new Apache Spark version 1.4.0 Data-frames API to extract information from Twitter's Status JSON, mostly focused on the Entities Object - the relevant part to this question is showed below:

{
  ...
  ...
  "entities": {
    "hashtags": [],
    "trends": [],
    "urls": [],
    "user_mentions": [
      {
        "screen_name": "linobocchini",
        "name": "Lino Bocchini",
        "id": 187356243,
        "id_str": "187356243",
        "indices": [ 3, 16 ]
      },
      {
        "screen_name": "jeanwyllys_real",
        "name": "Jean Wyllys",
        "id": 111123176,
        "id_str": "111123176",
        "indices": [ 79, 95 ]
      }
    ],
    "symbols": []
  },
  ...
  ...
}

There are several examples on how extract information from primitives types as string, integer, etc - but I couldn't find anything on how to process those kind of complex structures.

I tried the code below but it is still doesn't work, it throws an Exception

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val tweets = sqlContext.read.json("tweets.json")

// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)

import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String)

val mentions = tweets.select("entities.user_mentions").
  filter(!isEmpty($"user_mentions")).
  explode($"user_mentions") {
  case Row(arr: Array[Row]) => arr.map { elem =>
    UserMention(
      elem.getAs[Long]("id"),
      elem.getAs[String]("is_str"),
      elem.getAs[Array[Long]]("indices"),
      elem.getAs[String]("name"),
      elem.getAs[String]("screen_name"))
  }
}

mentions.first

Exception when I try to call mentions.first:

scala>     mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
    at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
    at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)

What is wrong here? I understand it is related to the types but I couldn't figure out it yet.

As additional context, the structure mapped automatically is:

scala> mentions.printSchema
root
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)

NOTE 1: I know it is possible to solve this using HiveQL but I would like to use Data-frames once there is so much momentum around it.

SELECT explode(entities.user_mentions) as mentions
FROM tweets

NOTE 2: the UDF val isEmpty = udf((value: List[Any]) => value.isEmpty) is a ugly hack and I'm missing something here, but was the only way I came up to avoid a NPE

like image 262
arjones Avatar asked Jun 24 '15 01:06

arjones


People also ask

How do I read a JSON file in Spark?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.

How do you explode a struct in spark?

Spark – explode Array of Struct to rows Problem: How to explode Array of StructType DataFrame columns to rows using Spark. Solution: Spark explode function can be used to explode an Array of Struct ArrayType(StructType) columns to rows on Spark DataFrame using scala example.

How do I print a schema of a DataFrame in spark?

To get the schema of the Spark DataFrame, use printSchema() on Spark DataFrame object. From the above example, printSchema() prints the schema to console( stdout ) and show() displays the content of the Spark DataFrame.


1 Answers

here is a solution that works, with just one small hack.

The main idea is to work around the type problem by declaring a List[String] rather than List[Row]:

val mentions = tweets.explode("entities.user_mentions", "mention"){m: List[String] => m}

This creates a second column called "mention" of type "Struct":

|            entities|             mention| 
+--------------------+--------------------+ 
|[List(),List(),Li...|[187356243,187356...| 
|[List(),List(),Li...|[111123176,111123...| 

Now do a map() to extract the fields inside mention. The getStruct(1) call gets the value in column 1 of each row:

case class Mention(id: Long, id_str: String, indices: Seq[Int], name: String, screen_name: String)
val mentionsRdd = mentions.map(
  row => 
    {  
      val mention = row.getStruct(1)
      Mention(mention.getLong(0), mention.getString(1), mention.getSeq[Int](2), mention.getString(3), mention.getString(4))
    }
)

And convert the RDD back into a DataFrame:

val mentionsDf = mentionsRdd.toDF()

There you go!

|       id|   id_str|     indices|         name|    screen_name|
+---------+---------+------------+-------------+---------------+
|187356243|187356243| List(3, 16)|Lino Bocchini|   linobocchini|
|111123176|111123176|List(79, 95)|  Jean Wyllys|jeanwyllys_real|
like image 138
Xinh Huynh Avatar answered Oct 04 '22 13:10

Xinh Huynh