Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Refresh metadata for Dataframe while reading parquet file

I am trying to read a parquet file as a dataframe which will be updated periodically(path is /folder_name. whenever a new data comes the old parquet file path(/folder_name) will be renamed to a temp path and then we union both new data and old data and will store in the old path(/folder_name)

What happens is suppose we have a parquet file as hdfs://folder_name/part-xxxx-xxx.snappy.parquet before updation and then after updation it is changed to hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet

The issue is happening is when I try to read the parquet file while the update is being done

sparksession.read.parquet("filename") => it takes the old path hdfs://folder_name/part-xxxx-xxx.snappy.parquet(path exists)

when an action is called on the dataframe it is trying to read the data from hdfs://folder_name/part-xxxx-xxx.snappy.parquet but because of updation the filename changed and I am getting the below issue

java.io.FileNotFoundException: File does not exist: hdfs://folder_name/part-xxxx-xxx.snappy.parquet It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

I am using Spark 2.2

Can anyone help me how to refresh the metadata?

like image 949
wazza Avatar asked Mar 04 '23 05:03

wazza


1 Answers

That error occurs when you are trying to read a file that doesn't exists.

Correct me if I'm wrong but I suspect you are overwriting all the files when you save the new dataframe (using .mode("overwrite")). While this process is running you are trying to read a file that was deleted and that exception is thrown - this makes the table unavailable for a period of time (during the update).

As far as I know there is no direct way of "refreshing the metadata" as you want.

Two (of several possible) ways of solving this:

1 - Use append mode

If you just want to append the new dataframe to the old one there is no need of creating a temporary folder and overwriting the old one. You can just change the save mode from overwrite to append. This way you can add partitions to an existing Parquet file without having to rewrite existing ones.

df.write
  .mode("append")
  .parquet("/temp_table")

This is by far the simplest solution and there is no need to read the data that was already stored. This, however, won't work if you have to update the old data (ex: if you are doing an upsert). For that you have option 2:

2 - Use a Hive view

You can create hive tables and use a view to point to the most recent (and available) one.

Here is an example on the logic behind this approach:

Part 1

  • If the view <table_name> does not exist we create a new table called <table_name>_alpha0 to store the new data
  • After creating the table we create a view <table_name> as select * from <table_name>_alpha0

Part 2

  • If the view <table_name> exists we need to see to which table it is pointing (<table_name>_alphaN)

  • You do all the operations you want with the new data save it as a table named <table_name>_alpha(N+1)

  • After creating the table we alter the view <table_name> to select * from <table_name>_alpha(N+1)

And a code example:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._


//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
  if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {

    val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
      .filter("col_name == 'View Text'")
      .rdd

    if(rdd_desc.isEmpty()) {
      None
    }
    else {
      Option(
        rdd_desc.first()
          .get(1)
          .toString
          .toLowerCase
          .stripPrefix("select * from ")
      )
    }
  }
  else
    None
}

//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.

def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
  val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
  val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")

  new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

  spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}

//An example on how to use this:

//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"

println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")

saveDataframe(spark, dbName, tableName, df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

val processed_df = df.unionByName(new_data) //Or other operations you want to do

println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

Result:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+

Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

By doing this you can guarantee that a version of the view <table_name> will always be available. This also has the advantage (or not, depending on your case) of maintaining the previous versions of the table. i.e. the previous version of <table_name_alpha1> will be <table_name_alpha0>

3 - A bonus

If upgrading your Spark version is an option, take a look at Delta Lake (minimum Spark version: 2.4.2)

Hope this helps :)

like image 52
André Machado Avatar answered Mar 15 '23 18:03

André Machado