Consider a code;
import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._
val path = ...
val dataFrame:DataFramew = ...
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
dataFrame.createOrReplaceTempView("my_table")
val results = hiveContext.sql(s"select * from my_table")
results.write.mode(SaveMode.Append).partitionBy("my_column").format("orc").save(path)
hiveContext.sql("REFRESH TABLE my_table")
This code is executed twice with same path but different dataFrames. The first run is successful, but subsequent rise en error:
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://somepath/somefile.snappy.orc
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
I have tried to clean up cache, invoke hiveContext.dropTempTable("tableName")
and all have no effect. When to call REFRESH TABLE tableName
before, after (other variants) to repair such error?
REFRESH TABLE statement invalidates the cached entries, which include data and metadata of the given table or view. The invalidated cache is populated in lazy manner when the cached table or the query associated with it is executed again.
The REFRESH TABLE statement refreshes the data in a materialized query table. The statement deletes all rows in the materialized query table and then inserts the result rows from the select-statement specified in the definition of the materialized query table.
You can refresh the table after the job is complete. After the job finishes, run the following command in Hive: > refresh tablename; This will refresh the data in the table, updating the new data.
Manually refreshClick Analyze > Refresh, or press Alt+F5. Tip: You can also refresh the PivotTable by right-clicking on the PivotTable, and then selecting Refresh. To update all PivotTables in your workbook at once, click Analyze > Refresh arrow > Refresh All.
You can run spark.catalog.refreshTable(tableName)
or spark.sql(s"REFRESH TABLE $tableName")
just before the write operation. I had same problem and it fixed my problem.
spark.catalog.refreshTable(tableName)
df.write.mode(SaveMode.Overwrite).insertInto(tableName)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With