Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using partitions (with partitionBy) when writing a delta lake has no effect

When I initially write a delta lake, using partitions (with partitionBy) or not, does not make any difference.

Using a repartition on the same column before writing, only changes the number of parquet-files. Making the column to partition explicitly 'not nullable' does not change the effect.

Versions:

  • Spark 2.4 (actually 2.4.0.0-mapr-620)
  • Scala 2.11.12
  • Delta Lake 0.5.0 (io.delta:delta-core_2.11:jar:0.5.0)
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val tmp = spark.createDataFrame(
    spark.sparkContext.parallelize((1 to 10).map(n => Row(n, n % 3))), 
    StructType(Seq(StructField("CONTENT", IntegerType), StructField("PARTITION", IntegerType))))

/* 
tmp.show
+-------+---------+
|CONTENT|PARTITION|
+-------+---------+
|      1|        1|
|      2|        2|
|      3|        0|
|      4|        1|
|      5|        2|
|      6|        0|
|      7|        1|
|      8|        2|
|      9|        0|
|     10|        1|
+-------+---------+
tmp.printSchema
root
 |-- CONTENT: integer (nullable = true)
 |-- PARTITION: integer (nullable = true)
*/

tmp.write.format("delta").partitionBy("PARTITION").save("PARTITIONED_DELTA_LAKE")

The resulting delta-lake directory is as follows:

ls -1 PARTITIONED_DELTA_LAKE
_delta_log
    00000000000000000000.json
part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet
part-00007-3155dde1-9f41-49b5-908e-08ce6fc077af-c000.snappy.parquet
part-00014-047f6a28-3001-4686-9742-4e4dbac05c53-c000.snappy.parquet
part-00021-e0d7f861-79e9-41c9-afcd-dbe688720492-c000.snappy.parquet
part-00028-fe3da69d-660a-445b-a99c-0e7ad2f92bf0-c000.snappy.parquet
part-00035-d69cfb9d-d320-4d9f-9b92-5d80c88d1a77-c000.snappy.parquet
part-00043-edd049a2-c952-4f7b-8ca7-8c0319932e2d-c000.snappy.parquet
part-00050-38eb3348-9e0d-49af-9ca8-a323e58b3712-c000.snappy.parquet
part-00057-906312ad-8556-4696-84ba-248b01664688-c000.snappy.parquet
part-00064-31f5d03d-2c63-40e7-8fe5-a8374eff9894-c000.snappy.parquet
part-00071-e1afc2b9-aa5b-4e7c-b94a-0c176523e9f1-c000.snappy.parquet

cat PARTITIONED_DELTA_LAKE/_delta_log/00000000000000000000.json
{"commitInfo":{"timestamp":1579073383370,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"2cdd6fbd-bffa-415e-9c06-94ffc2048cbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"CONTENT\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"PARTITION\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1579073381183}}
{"add":{"path":"part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet","partitionValues":{},"size":363,"modificationTime":1579073382329,"dataChange":true}}
{"add":{"path":"part-00007-3155dde1-9f41-49b5-908e-08ce6fc077af-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382545,"dataChange":true}}
{"add":{"path":"part-00014-047f6a28-3001-4686-9742-4e4dbac05c53-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382237,"dataChange":true}}
{"add":{"path":"part-00021-e0d7f861-79e9-41c9-afcd-dbe688720492-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382583,"dataChange":true}}
{"add":{"path":"part-00028-fe3da69d-660a-445b-a99c-0e7ad2f92bf0-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382893,"dataChange":true}}
{"add":{"path":"part-00035-d69cfb9d-d320-4d9f-9b92-5d80c88d1a77-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382488,"dataChange":true}}
{"add":{"path":"part-00043-edd049a2-c952-4f7b-8ca7-8c0319932e2d-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073383262,"dataChange":true}}
{"add":{"path":"part-00050-38eb3348-9e0d-49af-9ca8-a323e58b3712-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382683,"dataChange":true}}
{"add":{"path":"part-00057-906312ad-8556-4696-84ba-248b01664688-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382416,"dataChange":true}}
{"add":{"path":"part-00064-31f5d03d-2c63-40e7-8fe5-a8374eff9894-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382549,"dataChange":true}}
{"add":{"path":"part-00071-e1afc2b9-aa5b-4e7c-b94a-0c176523e9f1-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382511,"dataChange":true}}

I would expect something like

ls -1 PARTITIONED_DELTA_LAKE
_delta_log
    00000000000000000000.json
PARTITION=0
   part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet
   ...

cat PARTITIONED_DELTA_LAKE/_delta_log/00000000000000000000.json
..."partitionBy":"[PARTITION]"...
..."partitionColumns":[PARTITION]...
..."partitionValues":{0}...

like image 520
Florian Corzilius Avatar asked Jan 15 '20 08:01

Florian Corzilius


1 Answers

As Jacek commented, the used Spark version is too old. I have tried above code for the Spark-versions:

  • 2.4.0
  • 2.4.1
  • 2.4.2

Only with 2.4.2 partitioning works as expected. Within this release this bugfix might be the reason the issue is fixed:

.. Users can specify columns in partitionBy and our internal data sources will use this information. Unfortunately, for external systems, this data is silently dropped with no feedback given to the user ..

like image 62
Florian Corzilius Avatar answered Sep 26 '22 05:09

Florian Corzilius