My team is building an ETL process to load raw delimited text files into a Parquet based "data lake" using Spark. One of the promises of the Parquet column store is that a query will only read the necessary "column stripes".
But we're seeing unexpected columns being read for nested schema structures.
To demonstrate, here is a POC using Scala and the Spark 2.0.1 shell:
// Preliminary setup sc.setLogLevel("INFO") import org.apache.spark.sql.types._ import org.apache.spark.sql._ // Create a schema with nested complex structures val schema = StructType(Seq( StructField("F1", IntegerType), StructField("F2", IntegerType), StructField("Orig", StructType(Seq( StructField("F1", StringType), StructField("F2", StringType)))))) // Create some sample data val data = spark.createDataFrame( sc.parallelize(Seq( Row(1, 2, Row("1", "2")), Row(3, null, Row("3", "ABC")))), schema) // Save it data.write.mode(SaveMode.Overwrite).parquet("data.parquet")
Then we read the file back into a DataFrame and project to a subset of columns:
// Read it back into another DataFrame val df = spark.read.parquet("data.parquet") // Select & show a subset of the columns df.select($"F1", $"Orig.F1").show
When this runs we see the expected output:
+---+-------+ | F1|Orig_F1| +---+-------+ | 1| 1| | 3| 3| +---+-------+
But... the query plan shows a slightly different story:
The "optimized plan" shows:
val projected = df.select($"F1", $"Orig.F1".as("Orig_F1")) projected.queryExecution.optimizedPlan // Project [F1#18, Orig#20.F1 AS Orig_F1#116] // +- Relation[F1#18,F2#19,Orig#20] parquet
And "explain" shows:
projected.explain // == Physical Plan == // *Project [F1#18, Orig#20.F1 AS Orig_F1#116] // +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>
And the INFO logs produced during execution also confirm that the Orig.F2 column is unexpectedly read:
16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file: Parquet form: message spark_schema { optional int32 F1; optional group Orig { optional binary F1 (UTF8); optional binary F2 (UTF8); } } Catalyst form: StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))
According to the Dremel paper and the Parquet documentation, columns for complex nested structures should be independently stored and independently retrievable.
Questions:
Possibly related: Why does the query performance differ with nested columns in Spark SQL?
Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.
Inferring the Schema Using Reflection The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns.
Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
It's a limitation on the Spark query engine at the moment, the relevant JIRA ticket is below, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes
https://issues.apache.org/jira/browse/SPARK-17636
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With