Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark from_json with dynamic schema

I am trying to use Spark for processing JSON data with variable structure(nested JSON). Input JSON data could be very large with more than 1000 of keys per row and one batch could be more than 20 GB. Entire batch has been generated from 30 data sources and 'key2' of each JSON can be used to identify the source and structure for each source is predefined.

What would be the best approach for processing such data? I have tried using from_json like below but it works only with fixed schema and to use it first I need to group the data based on each source and then apply the schema. Due to large data volume my preferred choice is to scan the data only once and extract required values from each source, based on predefined schema.

import org.apache.spark.sql.types._ 
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])
like image 541
Syntax Avatar asked Mar 03 '18 19:03

Syntax


Video Answer


2 Answers

This is just a restatement of @Ramesh Maharjan's answer, but with more modern Spark syntax.

I found this method lurking in DataFrameReader which allows you to parse JSON strings from a Dataset[String] into an arbitrary DataFrame and take advantage of the same schema inference Spark gives you with spark.read.json("filepath") when reading directly from a JSON file. The schema of each row can be completely different.

def json(jsonDataset: Dataset[String]): DataFrame

Example usage:

val jsonStringDs = spark.createDataset[String](
  Seq(
      ("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
      ("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))

jsonStringDs.show

jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                 
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}  |
+----------------------------------------------------------------------------------------------------------------------+


val df = spark.read.json(jsonStringDs)
df.show(false)

df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO       |address           |employeeCount|firstname|lastname|marketCap   |name  |revenue     |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null      |[London,Baker,121]|null         |Sherlock |Holmes  |null        |null  |null        |
|Jeff Bezos|null              |500000       |null     |null    |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+

The method is available from Spark 2.2.0: http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame

like image 110
Wade Jensen Avatar answered Sep 27 '22 21:09

Wade Jensen


If you have data as you mentioned in the question as

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)

You don't need to create schema for json data. Spark sql can infer schema from the json string. You just have to use SQLContext.read.json as below

val df = sqlContext.read.json(data)

which will give you schema as below for the rdd data used above

root
 |-- key1: string (nullable = true)
 |-- key2: string (nullable = true)
 |-- key3: struct (nullable = true)
 |    |-- key3_k1: string (nullable = true)

And you can just select key3_k1 as

df2.select("key3.key3_k1").show(false)
//+-------+
//|key3_k1|
//+-------+
//|key3_v1|
//+-------+

You can manipulate the dataframe as you wish. I hope the answer is helpful

like image 38
Ramesh Maharjan Avatar answered Sep 27 '22 20:09

Ramesh Maharjan