Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Apache Spark read unnecessary Parquet columns within nested structures?

Tags:

My team is building an ETL process to load raw delimited text files into a Parquet based "data lake" using Spark. One of the promises of the Parquet column store is that a query will only read the necessary "column stripes".

But we're seeing unexpected columns being read for nested schema structures.

To demonstrate, here is a POC using Scala and the Spark 2.0.1 shell:

// Preliminary setup sc.setLogLevel("INFO") import org.apache.spark.sql.types._ import org.apache.spark.sql._  // Create a schema with nested complex structures val schema = StructType(Seq(     StructField("F1", IntegerType),     StructField("F2", IntegerType),     StructField("Orig", StructType(Seq(         StructField("F1", StringType),         StructField("F2", StringType))))))  // Create some sample data val data = spark.createDataFrame(     sc.parallelize(Seq(         Row(1, 2, Row("1", "2")),         Row(3, null, Row("3", "ABC")))),     schema)  // Save it data.write.mode(SaveMode.Overwrite).parquet("data.parquet") 

Then we read the file back into a DataFrame and project to a subset of columns:

// Read it back into another DataFrame val df = spark.read.parquet("data.parquet")  // Select & show a subset of the columns df.select($"F1", $"Orig.F1").show 

When this runs we see the expected output:

+---+-------+ | F1|Orig_F1| +---+-------+ |  1|      1| |  3|      3| +---+-------+ 

But... the query plan shows a slightly different story:

The "optimized plan" shows:

val projected = df.select($"F1", $"Orig.F1".as("Orig_F1")) projected.queryExecution.optimizedPlan // Project [F1#18, Orig#20.F1 AS Orig_F1#116] // +- Relation[F1#18,F2#19,Orig#20] parquet 

And "explain" shows:

projected.explain // == Physical Plan == // *Project [F1#18, Orig#20.F1 AS Orig_F1#116] // +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>> 

And the INFO logs produced during execution also confirm that the Orig.F2 column is unexpectedly read:

16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:  Parquet form: message spark_schema {   optional int32 F1;   optional group Orig {     optional binary F1 (UTF8);     optional binary F2 (UTF8);   } }  Catalyst form: StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true)) 

According to the Dremel paper and the Parquet documentation, columns for complex nested structures should be independently stored and independently retrievable.

Questions:

  1. Is this behavior a limitation of the current Spark query engine? In other words, does Parquet support optimally executing this query, but Spark's query planner is naive?
  2. Or, is this a limitation of the current Parquet implementation?
  3. Or, am I not using the Spark APIs correctly?
  4. Or, am I misunderstanding how Dremel/Parquet column storage is supposed to work?

Possibly related: Why does the query performance differ with nested columns in Spark SQL?

like image 262
Peter Stephens Avatar asked Oct 21 '16 20:10

Peter Stephens


People also ask

What is the advantage of storing data frames in parquet files in Sparksql?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

How does spark infer the schema?

Inferring the Schema Using Reflection The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns.

What is Apache parquet Databricks?

Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.


1 Answers

It's a limitation on the Spark query engine at the moment, the relevant JIRA ticket is below, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes

https://issues.apache.org/jira/browse/SPARK-17636

like image 140
Ewan Leith Avatar answered Sep 21 '22 22:09

Ewan Leith