Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Prevent DataFrame.partitionBy() from removing partitioned columns from schema

Tags:

I am partitioning a DataFrame as follows:

df.write.partitionBy("type", "category").parquet(config.outpath) 

The code gives the expected results (i.e. data partitioned by type & category). However, the "type" and "category" columns are removed from the data / schema. Is there a way to prevent this behaviour?

like image 587
Michael Avatar asked Mar 22 '16 20:03

Michael


People also ask

What is dynamic partition overwrite?

A write that dynamically overwrites partitions removes all existing data in each logical partition for which the write will commit new data. Any existing logical partition for which the write does not contain data will remain unchanged.

Which method is used to reduce the number of partitions post processing in spark?

Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

How will you control the number of partitions in spark DataFrame across the application?

repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame. However, repartition() involves shuffling which is a costly operation.

What is partitionBy in spark?

PySpark Partition is a way to split a large dataset into smaller datasets based on one or more partition keys. You can also create a partition on multiple columns using partitionBy(), just pass columns you want to partition as an argument to this method. Syntax: partitionBy(self, *cols)


2 Answers

I can think of one workaround, which is rather lame, but works.

import spark.implicits._  val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category") duplicated.write.partitionBy("_type", "_category").parquet(config.outpath) 

I'm answering this question in hopes that someone would have a better answer or explanation than what I have (if OP has found a better solution), though, since I have the same question.

like image 55
Ivan Gozali Avatar answered Sep 24 '22 22:09

Ivan Gozali


In general, Ivan's answer is a fine cludge. BUT...

If you are strictly reading and writing in spark, you can just use the basePath option when reading your data.

https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery

By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths.

Example:

     val dataset = spark       .read       .format("parquet")       .option("basePath", hdfsInputBasePath)       .load(hdfsInputPath) 
like image 36
Robert Beatty Avatar answered Sep 22 '22 22:09

Robert Beatty