Is there any way to append a new column to an existing parquet file?
I'm currently working on a kaggle competition, and I've converted all the data to parquet files.
Here was the case, I read the parquet file into pyspark DataFrame, did some feature extraction and appended new columns to DataFrame with
pysaprk.DataFrame.withColumn().
After that, I want to save the new columns in the source parquet file.
I know Spark SQL come with Parquet schema evolution, but the example only have shown the case with a key-value.
The parquet "append" mode doesn't do the trick either. It only append new rows to the parquet file. If there's anyway to append a new column to an existing parquet file instead of generate the whole table again? Or I have to generate a separate new parquet file and join them on the runtime.
You do not modify columns. You read, change, then re-write. One way to complete the above concept in hive query language: select parquet into a non parquet table, do your work to modify the new table, update the new column, etc, then select back into a new parquet table with the new schema.
Parquet slices columns into chunks and allows parts of a column to be stored in several chunks within a single file, thus append is possible.
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
TLDR. Parquet-rewriter is a way to update parquet files by rewriting them in a more efficient manner.
Yes, it possible with both Databricks Delta as well as with parquet tables. An example is given below:-
This Example wrote in python (pySpark)
df = sqlContext.createDataFrame([('1','Name_1','Address_1'),('2','Name_2','Address_2'),('3','Name_3','Address_3')], schema=['ID', 'Name', 'Address'])
delta_tblNm = 'testDeltaSchema.test_delta_tbl'
parquet_tblNm = 'testParquetSchema.test_parquet_tbl'
delta_write_loc = 'dbfs:///mnt/datalake/stg/delta_tblNm'
parquet_write_loc = 'dbfs:///mnt/datalake/stg/parquet_tblNm'
# DELTA TABLE
df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(delta_write_loc)
spark.sql(" create table if not exists {} using DELTA LOCATION '{}'".format(delta_tblNm, delta_write_loc))
spark.sql("refresh table {}".format(print(cur_tblNm)))
# PARQUET TABLE
df.write.format("parquet").mode("overwrite").save(parquet_write_loc)
spark.sql("""CREATE TABLE if not exists {} USING PARQUET LOCATION '{}'""".format(parquet_tblNm, parquet_write_loc))
spark.sql(""" REFRESH TABLE {} """.format(parquet_tblNm))
test_df = spark.sql("select * testDeltaSchema.test_delta_tbl")
test_df.show()
test_df = spark.sql("select * from testParquetSchema.test_parquet_tbl")
test_df.show()
test_df = spark.sql("ALTER TABLE testDeltaSchema.test_delta_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()
test_df = spark.sql("ALTER TABLE testParquetSchema.test_parquet_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With