Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark 2.0.0 reading json data with variable schema

I am trying to process a month's worth of website traffic, which is stored in an S3 bucket as json (one json object per line/website traffic hit). The amount of data is big enough that I can't ask Spark to infer the schema (OOM errors). If I specify the schema it loads fine obviously. But, the issue is that the fields contained in each json object differ, so even if I build a schema using one day's worth of traffic, the monthly schema will be different (more fields) and so my Spark job fails.

So I'm curious to understand how others deal with this issue. I can for example use a traditional RDD mapreduce job to extract the fields I'm interested in, export and then load everything into a dataframe. But this is slow and seems a bit like self-defeating.

I've found a similar question here but no relevant info for me.

Thanks.

like image 443
xv70 Avatar asked Aug 22 '16 15:08

xv70


People also ask

How do I read a JSON schema in Spark?

Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.

Can Spark read nested JSON?

In our Read JSON file in Spark post, we have read a simple JSON file into a Spark Dataframe. In this post, we are moving to handle an advanced JSON data type. We will read nested JSON in spark Dataframe.

Is there a schema for JSON?

JSON Schema is an IETF standard providing a format for what JSON data is required for a given application and how to interact with it. Applying such standards for a JSON document lets you enforce consistency and data validity across similar JSON data.


1 Answers

If you know the fields you're interested in just provide a subset of schema. JSON reader can gracefully ignore unexpected fields. Let's say your data looks like this:

import json
import tempfile

object = {"foo": {"bar": {"x": 1, "y": 1}, "baz": [1, 2, 3]}}

_, f = tempfile.mkstemp()
with open(f, "w") as fw:
    json.dump(object, fw)

and you're interested only in foo.bar.x and foo.bar.z (non-existent):

from pyspark.sql.types import StructType

schema = StructType.fromJson({'fields': [{'metadata': {},
   'name': 'foo',
   'nullable': True,
   'type': {'fields': [
       {'metadata': {}, 'name': 'bar', 'nullable': True, 'type': {'fields': [
           {'metadata': {}, 'name': 'x', 'nullable': True, 'type': 'long'},
           {'metadata': {}, 'name': 'z', 'nullable': True, 'type': 'double'}],
       'type': 'struct'}}],
    'type': 'struct'}}],
 'type': 'struct'})

df = spark.read.schema(schema).json(f)
df.show()

## +----------+
## |       foo|
## +----------+
## |[[1,null]]|
## +----------+

df.printSchema()
## root
##  |-- foo: struct (nullable = true)
##  |    |-- bar: struct (nullable = true)
##  |    |    |-- x: long (nullable = true)
##  |    |    |-- z: double (nullable = true)

You can also reduce sampling ratio for schema inference to improve overall performance.

like image 58
zero323 Avatar answered Sep 22 '22 23:09

zero323