For a given DataFrame just before being save
'd to parquet
here is the schema: notice that the centroid0
is the first column and is StringType
:
However when saving the file using:
df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)
and with the partitionCols
as centroid0
:
then there is a (to me) surprising result:
centroid0
partition column has been moved to the end of the RowInteger
I confirmed the output path via println
:
path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters
And here is the schema upon reading back from the saved parquet
:
Why are those two modifications to the input schema occurring - and how can they be avoided - while still maintaining the centroid0
as a partitioning column?
Update A preferred answer should mention why /when the partitions were added to the end (vs the beginning) of the columns list. We need an understanding of the deterministic ordering.
In addition - is there any way to cause spark
to "change it's mind" on the inferred column types? I have had to change the partitions from 0
, 1
etc to c0
, c1
etc in order to get the inference to map to StringType
. Maybe that were required .. but if there were some spark setting to change the behavior that would make for an excellent answer.
When you write.partitionBy(...)
Spark saves the partition field(s) as folder(s)
This is can be beneficial for reading data later as (with some file types, parquet included) it can optimize to read data just from partitions that you use (i.e. if you'd read and filter for centroid0==1 spark wouldn't read the other partitions
The effect of this is that the partition fields (centroid0
in your case) are not written into the parquet file only as folder names (centroid0=1
, centroid0=2
, etc.)
The side effect of these are 1. the type of the partition is inferred at run time (since the schema is not saved in the parquet) and in your case it happened that you only had integer values so it was inferred to integer.
The other side effect is that the partition field is added at the end/beginning of the schema as it reads the schema from the parquet files as one chunk and then it adds to that the partition field(s) as another (again, it is no longer part of the schema that is stored in the parquet)
You can actually pretty easily make use of ordering of the columns of a case class that holds the schema of your partitioned data. You will need to read the data from the path, inside which the partitioning columns are stored underneath to make Spark infer the values of these columns. Then simply apply re-ordering by using the case class schema with a statement like:
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
.schema(encoder.schema)
.format("parquet")
.option("mergeSchema", "true")
.load(myPath)
// reorder columns, since reading from partitioned data, the partitioning columns are put to end
.select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
.as[RecordType]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With