Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient reading nested parquet column in Spark

I have following (simplified) schema:

root
 |-- event: struct (nullable = true)
 |    |-- spent: struct (nullable = true)
 |    |    |-- amount: decimal(34,3) (nullable = true)
 |    |    |-- currency: string (nullable = true)
 |    |
 |    | ... ~ 20 other struct fields on "event" level

I'm trying to sum on nested field

spark.sql("select sum(event.spent.amount) from event")

According to spark metrics I'm reading 18 GB from disk and it takes 2.5 min.

However when I select the top level field:

 spark.sql("select sum(amount) from event")

I read only 2GB in 4 seconds.

From the physical plan I can see that in case of nested structure the whole event struct with all fields are read from parquet, which is a waste.

Parquet format should be able to provide the desired column from nested structure without reading it all (which is the point of columnar store). Is there some way to do this efficiently in Spark ?

like image 814
Tomas Bartalos Avatar asked Aug 02 '19 17:08

Tomas Bartalos


People also ask

Is Parquet good for nested data?

There is a negative consequence keeping nested structure in parquet. The issue is spark predicate pushdown doesn't work properly if you have nested structure in the parquet file. So even if you are working with few fields in your parquet dataset spark will load and materialize the entire dataset.

How do I read a specific column in a parquet file?

read. parquet("fs://path/file.parquet").select(...) This will only read the corresponding columns. Indeed, parquet is a columnar storage and it is exactly meant for this type of use case.

How does Parquet store nested data?

To encode nested columns, Parquet uses the Dremel encoding with definition and repetition levels. Definition levels specify how many optional fields in the path for the column are defined. Repetition levels specify at what repeated field in the path has the value repeated.


1 Answers

Solution:

spark.sql("set spark.sql.optimizer.nestedSchemaPruning.enabled=true")
spark.sql("select sum(amount) from (select event.spent.amount as amount from event_archive)")

The query must be written in sub-select fashion. You can't wrap the selected column in aggregate function. Following query will break schema pruning:

select sum(event.spent.amount) as amount from event

Whole schema pruning work is covered in SPARK-4502

Dirty workaround can be also specifying "projected schema" at load time:

val DecimalType = DataTypes.createDecimalType(18, 4)
val schema = StructType(StructField("event", StructType(
      StructField("spent", StructType(
          StructField("amount", DecimalType, true) :: Nil
      ), true) :: Nil
    ), true) :: Nil
  )
 val df = spark.read.format("parquet").schema(schema).load(<path>)
like image 128
Tomas Bartalos Avatar answered Oct 21 '22 00:10

Tomas Bartalos