This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.
I am saving a DataFrame
as a parquet
, but when I reload the DataFrame
from parquet
file and save it once again as parquet
, I get an error.
valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Error message -
executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.
df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')
Workaround for this problem: A non-elegant way to solve this issue is to save the DataFrame
as parquet
file with a different name, then delete the original parquet
file and finally, rename this parquet
file to the old name.
# Workaround
import os
import shutil
# Load it back
df = spark.read.format('parquet').load('.../temp')
# Save it back as temp1, as opposed to original temp
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')
# Delete the original parquet file
shutil.rmtree('.../temp')
# Renaming the parquet folder.
os.rename('.../temp1','.../temp')
But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.
One solution for this error is to cache, make an action to the df (example: df.show()
) and then save the parquet file in "overwrite" mode.
in python:
save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")
....... make your transformation to the df which is new_df
new_df.cache()
new_df.show()
new_df.write.format("parquet")\
.mode(save_mode)\
.save("path_to_parquet")
When data is taken out of a cache it seems to work fine.
val df = spark.read.format("parquet").load("temp").cache()
cache
is a lazy operation, and doesn't trigger any computation, we have to add some dummy action.
println(df.count()) //count over parquet files should be very fast
Now it should work:
df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With