Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I upsert into HDFS with spark?

I have partitioned data in the HDFS. At some point I decide to update it. The algorithm is:

  • Read the new data from a kafka topic.
  • Find out new data's partition names.
  • Load the data from partitions with these names that is in the HDFS.
  • Merge the HDFS data with the new data.
  • Overwrite partitions that are already on disk.

The problem is that what if the new data has partitions that don't exist on disk yet. In that case they don't get written. https://stackoverflow.com/a/49691528/10681828 <- this solution doesn't write new partitions for example. enter image description here

The above picture describes the situation. Let's think of the left disk as being the partitions that are already in HDFS and of the right disk as partitions that we just received from Kafka.

Some of the partitions of the right disk will intersect with the already existing ones, the others won't. And this code:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
dataFrame
    .write
    .mode(SaveMode.Overwrite)
    .partitionBy("date", "key")
    .option("header", "true")
    .format(format)
    .save(path)

is not able to write the blue part of the picture to disk.

So, how do I resolve this issue? Please provide code. I am looking for something performant.

An example for those who don't understand:

Suppose we have this data in the HDFS:

  • PartitionA has data "1"
  • PartitionB has data "1"

Now we receive this new data:

  • PartitionB has data "2"
  • PartitionC has data "1"

So, partitions A and B are in the HDFS, and partitions B and C are the new ones, and since B is in the HDFS we update it. And I want C to be written. So the end result should look like this:

  • PartitionA has data "1"
  • PartitionB has data "2"
  • PartitionC has data "1"

But If I use the code from above, I get this:

  • PartitionA has data "1"
  • PartitionB has data "2"

Because the new feature overwrite dynamic from spark 2.3 is not able to create PartitionC.

Update: It turns out that if you use hive tables instead, this will work. But if you use pure spark it doesn't... So, I guess hive's overwrite and spark's overwrite work differently.

like image 462
pavel_orekhov Avatar asked Jan 18 '19 00:01

pavel_orekhov


People also ask

What is spark coalesce?

PySpark Coalesce is a function in PySpark that is used to work with the partition data in a PySpark Data Frame. The Coalesce method is used to decrease the number of partitions in a Data Frame; The coalesce function avoids the full shuffling of data.

How do you write an update statement in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.


1 Answers

In the end I just decided to delete that "green" subset of partitions from HDFS, and use SaveMode.Append instead. I think this is a bug in spark.

like image 75
pavel_orekhov Avatar answered Sep 18 '22 06:09

pavel_orekhov