Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to overwrite a parquet file from where DataFrame is being read in Spark

This is a microcosm of the problem I am facing, where I am getting an error. Let me try to reproduce it here.

I am saving a DataFrame as a parquet, but when I reload the DataFrame from parquet file and save it once again as parquet, I get an error.

valuesCol = [('Male','2019-09-06'),('Female','2019-09-06'),('Male','2019-09-07')]
df = spark.createDataFrame(valuesCol,['sex','date'])
# Save as parquet
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

# Load it back
df = spark.read.format('parquet').load('.../temp')
df = df.where(col('sex')=='Male')
# Save it back - This produces ERROR   
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

Error message -

executor 22): java.io.FileNotFoundException: Requested file maprfs:///mapr/.../temp/part-00000-f67d5a62-36f2-4dd2-855a-846f422e623f-c000.snappy.parquet does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

Another SO question addresses this issue. The proposed solution was to refresh the table like the code below, but that did not help. The issue is with the refreshing of the metadata. I don't know how to refresh it.

df.createOrReplaceTempView('table_view')
spark.catalog.refreshTable('table_view')
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp')

Workaround for this problem: A non-elegant way to solve this issue is to save the DataFrame as parquet file with a different name, then delete the original parquet file and finally, rename this parquet file to the old name.

# Workaround
import os
import shutil

# Load it back
df = spark.read.format('parquet').load('.../temp')

# Save it back as temp1, as opposed to original temp      
df.repartition(1).write.format('parquet').mode('overwrite').save('.../temp1')

# Delete the original parquet file
shutil.rmtree('.../temp')

# Renaming the parquet folder.
os.rename('.../temp1','.../temp')

But, the problem is that some DataFrames are quite big and this may not be the best way to deal with it. Not to mention if renaming will cause some problem with the MetaData, that I am not sure of.

like image 719
cph_sto Avatar asked Oct 16 '22 11:10

cph_sto


2 Answers

One solution for this error is to cache, make an action to the df (example: df.show()) and then save the parquet file in "overwrite" mode.

in python:

save_mode = "overwrite"
df = spark.read.parquet("path_to_parquet")

....... make your transformation to the df which is new_df

new_df.cache()
new_df.show()

new_df.write.format("parquet")\
                .mode(save_mode)\
                .save("path_to_parquet")
like image 189
Andre Aguiar Avatar answered Nov 12 '22 10:11

Andre Aguiar


When data is taken out of a cache it seems to work fine.

val df = spark.read.format("parquet").load("temp").cache()

cache is a lazy operation, and doesn't trigger any computation, we have to add some dummy action.

println(df.count()) //count over parquet files should be very fast  

Now it should work:

df.repartition(1).write.mode(SaveMode.Overwrite).parquet("temp")
like image 22
Gelerion Avatar answered Nov 12 '22 09:11

Gelerion