I am trying to process a month's worth of website traffic, which is stored in an S3 bucket as json (one json object per line/website traffic hit). The amount of data is big enough that I can't ask Spark to infer the schema (OOM errors). If I specify the schema it loads fine obviously. But, the issue is that the fields contained in each json object differ, so even if I build a schema using one day's worth of traffic, the monthly schema will be different (more fields) and so my Spark job fails.
So I'm curious to understand how others deal with this issue. I can for example use a traditional RDD mapreduce job to extract the fields I'm interested in, export and then load everything into a dataframe. But this is slow and seems a bit like self-defeating.
I've found a similar question here but no relevant info for me.
Thanks.
Spark SQL can automatically infer the schema of a JSON dataset and load it as a Dataset<Row> . This conversion can be done using SparkSession. read(). json() on either a Dataset<String> , or a JSON file.
In our Read JSON file in Spark post, we have read a simple JSON file into a Spark Dataframe. In this post, we are moving to handle an advanced JSON data type. We will read nested JSON in spark Dataframe.
JSON Schema is an IETF standard providing a format for what JSON data is required for a given application and how to interact with it. Applying such standards for a JSON document lets you enforce consistency and data validity across similar JSON data.
If you know the fields you're interested in just provide a subset of schema. JSON reader can gracefully ignore unexpected fields. Let's say your data looks like this:
import json
import tempfile
object = {"foo": {"bar": {"x": 1, "y": 1}, "baz": [1, 2, 3]}}
_, f = tempfile.mkstemp()
with open(f, "w") as fw:
json.dump(object, fw)
and you're interested only in foo.bar.x
and foo.bar.z
(non-existent):
from pyspark.sql.types import StructType
schema = StructType.fromJson({'fields': [{'metadata': {},
'name': 'foo',
'nullable': True,
'type': {'fields': [
{'metadata': {}, 'name': 'bar', 'nullable': True, 'type': {'fields': [
{'metadata': {}, 'name': 'x', 'nullable': True, 'type': 'long'},
{'metadata': {}, 'name': 'z', 'nullable': True, 'type': 'double'}],
'type': 'struct'}}],
'type': 'struct'}}],
'type': 'struct'})
df = spark.read.schema(schema).json(f)
df.show()
## +----------+
## | foo|
## +----------+
## |[[1,null]]|
## +----------+
df.printSchema()
## root
## |-- foo: struct (nullable = true)
## | |-- bar: struct (nullable = true)
## | | |-- x: long (nullable = true)
## | | |-- z: double (nullable = true)
You can also reduce sampling ratio for schema inference to improve overall performance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With