I have partitioned data in the HDFS. At some point I decide to update it. The algorithm is:
The problem is that what if the new data has partitions that don't exist on disk yet. In that case they don't get written. https://stackoverflow.com/a/49691528/10681828 <- this solution doesn't write new partitions for example.
The above picture describes the situation. Let's think of the left disk as being the partitions that are already in HDFS and of the right disk as partitions that we just received from Kafka.
Some of the partitions of the right disk will intersect with the already existing ones, the others won't. And this code:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
.write
.mode(SaveMode.Overwrite)
.partitionBy("date", "key")
.option("header", "true")
.format(format)
.save(path)
is not able to write the blue part of the picture to disk.
So, how do I resolve this issue? Please provide code. I am looking for something performant.
An example for those who don't understand:
Suppose we have this data in the HDFS:
Now we receive this new data:
So, partitions A and B are in the HDFS, and partitions B and C are the new ones, and since B is in the HDFS we update it. And I want C to be written. So the end result should look like this:
But If I use the code from above, I get this:
Because the new feature overwrite dynamic
from spark 2.3 is not able to create PartitionC.
Update: It turns out that if you use hive tables instead, this will work. But if you use pure spark it doesn't... So, I guess hive's overwrite and spark's overwrite work differently.
PySpark Coalesce is a function in PySpark that is used to work with the partition data in a PySpark Data Frame. The Coalesce method is used to decrease the number of partitions in a Data Frame; The coalesce function avoids the full shuffling of data.
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
In the end I just decided to delete that "green" subset of partitions from HDFS, and use SaveMode.Append
instead. I think this is a bug in spark.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With