I have following (simplified) schema:
root
|-- event: struct (nullable = true)
| |-- spent: struct (nullable = true)
| | |-- amount: decimal(34,3) (nullable = true)
| | |-- currency: string (nullable = true)
| |
| | ... ~ 20 other struct fields on "event" level
I'm trying to sum on nested field
spark.sql("select sum(event.spent.amount) from event")
According to spark metrics I'm reading 18 GB from disk and it takes 2.5 min.
However when I select the top level field:
spark.sql("select sum(amount) from event")
I read only 2GB in 4 seconds.
From the physical plan I can see that in case of nested structure the whole event struct with all fields are read from parquet, which is a waste.
Parquet format should be able to provide the desired column from nested structure without reading it all (which is the point of columnar store). Is there some way to do this efficiently in Spark ?
There is a negative consequence keeping nested structure in parquet. The issue is spark predicate pushdown doesn't work properly if you have nested structure in the parquet file. So even if you are working with few fields in your parquet dataset spark will load and materialize the entire dataset.
read. parquet("fs://path/file.parquet").select(...) This will only read the corresponding columns. Indeed, parquet is a columnar storage and it is exactly meant for this type of use case.
To encode nested columns, Parquet uses the Dremel encoding with definition and repetition levels. Definition levels specify how many optional fields in the path for the column are defined. Repetition levels specify at what repeated field in the path has the value repeated.
Solution:
spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
spark.sql("select sum(amount) from (select event.spent.amount as amount from event_archive)")
The query must be written in sub-select fashion. You can't wrap the selected column in aggregate function. Following query will break schema pruning:
select sum(event.spent.amount) as amount from event
Whole schema pruning work is covered in SPARK-4502
Dirty workaround can be also specifying "projected schema" at load time:
val DecimalType = DataTypes.createDecimalType(18, 4)
val schema = StructType(StructField("event", StructType(
StructField("spent", StructType(
StructField("amount", DecimalType, true) :: Nil
), true) :: Nil
), true) :: Nil
)
val df = spark.read.format("parquet").schema(schema).load(<path>)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With