Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to query JSON data column using Spark DataFrames?

I have a Cassandra table that for simplicity looks something like:

key: text jsonData: text blobData: blob 

I can create a basic data frame for this using spark and the spark-cassandra-connector using:

val df = sqlContext.read   .format("org.apache.spark.sql.cassandra")   .options(Map("table" -> "mytable", "keyspace" -> "ks1"))   .load() 

I'm struggling though to expand the JSON data into its underlying structure. I ultimately want to be able to filter based on the attributes within the json string and return the blob data. Something like jsonData.foo = "bar" and return blobData. Is this currently possible?

like image 657
JDesuv Avatar asked Dec 03 '15 15:12

JDesuv


People also ask

How do I select a column from a Spark data frame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How does Spark read JSON?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. using the read. json() function, which loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file that is offered as a json file is not a typical JSON file.


1 Answers

Spark >= 2.4

If needed, schema can be determined using schema_of_json function (please note that this assumes that an arbitrary row is a valid representative of the schema).

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json} import collection.JavaConverters._  val schema = schema_of_json(lit(df.select($"jsonData").as[String].first)) df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava)) 

Spark >= 2.1

You can use from_json function:

import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._  val schema = StructType(Seq(   StructField("k", StringType, true), StructField("v", DoubleType, true) ))  df.withColumn("jsonData", from_json($"jsonData", schema)) 

Spark >= 1.6

You can use get_json_object which takes a column and a path:

import org.apache.spark.sql.functions.get_json_object  val exprs = Seq("k", "v").map(   c => get_json_object($"jsonData", s"$$.$c").alias(c))  df.select($"*" +: exprs: _*) 

and extracts fields to individual strings which can be further casted to expected types.

The path argument is expressed using dot syntax, with leading $. denoting document root (since the code above uses string interpolation $ has to be escaped, hence $$.).

Spark <= 1.5:

Is this currently possible?

As far as I know it is not directly possible. You can try something similar to this:

val df = sc.parallelize(Seq(   ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),   ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData") 

I assume that blob field cannot be represented in JSON. Otherwise you cab omit splitting and joining:

import org.apache.spark.sql.Row  val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{   case Row(key: String, json: String) =>     s"""{"key": "$key", "jsonData": $json}""" })   val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema  // root //  |-- jsonData: struct (nullable = true) //  |    |-- k: string (nullable = true) //  |    |-- v: double (nullable = true) //  |-- key: long (nullable = true) //  |-- blobData: string (nullable = true) 

An alternative (cheaper, although more complex) approach is to use an UDF to parse JSON and output a struct or map column. For example something like this:

import net.liftweb.json.parse  case class KV(k: String, v: Int)  val parseJson = udf((s: String) => {   implicit val formats = net.liftweb.json.DefaultFormats   parse(s).extract[KV] })  val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show  // +---+--------------------+------------------+----------+ // |key|            jsonData|          blobData|parsedJSON| // +---+--------------------+------------------+----------+ // |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]| // |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]| // +---+--------------------+------------------+----------+  parsed.printSchema  // root //  |-- key: string (nullable = true) //  |-- jsonData: string (nullable = true) //  |-- blobData: string (nullable = true) //  |-- parsedJSON: struct (nullable = true) //  |    |-- k: string (nullable = true) //  |    |-- v: integer (nullable = false) 
like image 130
zero323 Avatar answered Oct 25 '22 17:10

zero323