Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Parquet Statistics(min/max) integration

I have been looking into how Spark stores statistics (min/max) in Parquet as well as how it uses the info for query optimization. I have got a few questions. First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows, with a long type and a string type column. They are sorted by different columns, though.

scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
scala> spark.sql("select id, cast(id as string) text from range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")

I added some code to parquet-tools to print out stats and examine the generated parquet files:

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet 
file:        file:/secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          REQUIRED INT64 R:0 D:0
text:        REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:133 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
text:         BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5 ENC:PLAIN,BIT_PACKED

hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta /secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet 
file:        file:/secret/spark21-sortByText/part-00000-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
id:          REQUIRED INT64 R:0 D:0
text:        REQUIRED BINARY O:UTF8 R:0 D:0

row group 1: RC:5 TS:140 OFFSET:4 
--------------------------------------------------------------------------------
id:           INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5 ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
text:         BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5 ENC:PLAIN,BIT_PACKED

So the question is why is Spark, particularly, 2.1.0, only generate min/max for numeric columns, but not strings(BINARY) fields, even if the string field is included in the sort? Maybe I missed a configuraiton?

The second issue, is how can I confirm Spark is utilizing the min/max?

scala> sc.setLogLevel("INFO")
scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where id=4").show

I got many lines like this:

17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00000-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-558, partition values: [empty row]
...
17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate: and(noteq(id, null), eq(id, 4))
17/01/17 09:23:35 INFO FileScanRDD: Reading File path: file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet, range: 0-574, partition values: [empty row]
...

The question is it looks like Spark is scanning every file, even if from the min/max, Spark should be able to determine only part-00000 has the relevant data. Or maybe I read it wrong, that Spark is skipping the files? Maybe Spark can only use partition value for data skipping?

like image 700
user7431049 Avatar asked Jan 17 '17 14:01

user7431049


3 Answers

PARQUET-686 made changes to intentionally ignore statistics on binary field when it seems to be appropriate. You can override this behavior by setting parquet.strings.signed-min-max.enabled to true.

After setting that config, you can read min/max in binary field with parquet-tools.

More details in my another stackoverflow question

like image 198
ruseel Avatar answered Dec 21 '22 10:12

ruseel


This has been resolved in Spark-2.4.0 version. In here they have upgraded parquet version from 1.8.2 to 1.10.0.

[SPARK-23972] Update Parquet from 1.8.2 to 1.10.0

With these all column types, whether they are Int/String/Decimal will contain min/max statistics.

like image 45
aakashmandlik29 Avatar answered Dec 21 '22 12:12

aakashmandlik29


For the first question, I believe this is a matter of definition (what would be the min/max of a string? lexical ordering?) but in any case as far as I know, spark's parquet currently only indexes numbers.

As for the second question, I believe that if you look deeper you would see that spark is not loading the files themselves. Instead it is reading the metadata so it knows whether to read a block or not. So basically it is pushing the predicate to the file (block) level.

like image 31
Assaf Mendelson Avatar answered Dec 21 '22 11:12

Assaf Mendelson