I am partitioning a DataFrame as follows:
df.write.partitionBy("type", "category").parquet(config.outpath)
The code gives the expected results (i.e. data partitioned by type & category). However, the "type" and "category" columns are removed from the data / schema. Is there a way to prevent this behaviour?
A write that dynamically overwrites partitions removes all existing data in each logical partition for which the write will commit new data. Any existing logical partition for which the write does not contain data will remain unchanged.
Spark RDD coalesce() is used only to reduce the number of partitions. This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.
repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame. However, repartition() involves shuffling which is a costly operation.
PySpark Partition is a way to split a large dataset into smaller datasets based on one or more partition keys. You can also create a partition on multiple columns using partitionBy(), just pass columns you want to partition as an argument to this method. Syntax: partitionBy(self, *cols)
I can think of one workaround, which is rather lame, but works.
import spark.implicits._ val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category") duplicated.write.partitionBy("_type", "_category").parquet(config.outpath)
I'm answering this question in hopes that someone would have a better answer or explanation than what I have (if OP has found a better solution), though, since I have the same question.
In general, Ivan's answer is a fine cludge. BUT...
If you are strictly reading and writing in spark, you can just use the basePath option when reading your data.
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery
By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths.
Example:
val dataset = spark .read .format("parquet") .option("basePath", hdfsInputBasePath) .load(hdfsInputPath)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With