Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Partition column is moved to end of row when saving a file to Parquet

Tags:

For a given DataFrame just before being save'd to parquet here is the schema: notice that the centroid0 is the first column and is StringType:

enter image description here

However when saving the file using:

      df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)

and with the partitionCols as centroid0:

enter image description here

then there is a (to me) surprising result:

  • the centroid0 partition column has been moved to the end of the Row
  • the data type has been changed to Integer

I confirmed the output path via println :

 path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters

And here is the schema upon reading back from the saved parquet:

enter image description here

Why are those two modifications to the input schema occurring - and how can they be avoided - while still maintaining the centroid0 as a partitioning column?

Update A preferred answer should mention why /when the partitions were added to the end (vs the beginning) of the columns list. We need an understanding of the deterministic ordering.

In addition - is there any way to cause spark to "change it's mind" on the inferred column types? I have had to change the partitions from 0, 1 etc to c0, c1 etc in order to get the inference to map to StringType. Maybe that were required .. but if there were some spark setting to change the behavior that would make for an excellent answer.

like image 411
WestCoastProjects Avatar asked Jun 21 '18 07:06

WestCoastProjects


2 Answers

When you write.partitionBy(...) Spark saves the partition field(s) as folder(s) This is can be beneficial for reading data later as (with some file types, parquet included) it can optimize to read data just from partitions that you use (i.e. if you'd read and filter for centroid0==1 spark wouldn't read the other partitions

The effect of this is that the partition fields (centroid0 in your case) are not written into the parquet file only as folder names (centroid0=1, centroid0=2, etc.)

The side effect of these are 1. the type of the partition is inferred at run time (since the schema is not saved in the parquet) and in your case it happened that you only had integer values so it was inferred to integer.

The other side effect is that the partition field is added at the end/beginning of the schema as it reads the schema from the parquet files as one chunk and then it adds to that the partition field(s) as another (again, it is no longer part of the schema that is stored in the parquet)

like image 147
Arnon Rotem-Gal-Oz Avatar answered Sep 28 '22 19:09

Arnon Rotem-Gal-Oz


You can actually pretty easily make use of ordering of the columns of a case class that holds the schema of your partitioned data. You will need to read the data from the path, inside which the partitioning columns are stored underneath to make Spark infer the values of these columns. Then simply apply re-ordering by using the case class schema with a statement like:

val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
      .schema(encoder.schema)
      .format("parquet")
      .option("mergeSchema", "true")
      .load(myPath)
      // reorder columns, since reading from partitioned data, the partitioning columns are put to end
      .select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
      .as[RecordType]
like image 20
Felix.Reuthlinger Avatar answered Sep 28 '22 19:09

Felix.Reuthlinger