Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Append a new column to an existing parquet file

Is there any way to append a new column to an existing parquet file?

I'm currently working on a kaggle competition, and I've converted all the data to parquet files.

Here was the case, I read the parquet file into pyspark DataFrame, did some feature extraction and appended new columns to DataFrame with

pysaprk.DataFrame.withColumn().

After that, I want to save the new columns in the source parquet file.

I know Spark SQL come with Parquet schema evolution, but the example only have shown the case with a key-value.

The parquet "append" mode doesn't do the trick either. It only append new rows to the parquet file. If there's anyway to append a new column to an existing parquet file instead of generate the whole table again? Or I have to generate a separate new parquet file and join them on the runtime.

like image 795
Chu-Yu Hsu Avatar asked Aug 04 '15 15:08

Chu-Yu Hsu


People also ask

How do I add a column in Parquet?

You do not modify columns. You read, change, then re-write. One way to complete the above concept in hive query language: select parquet into a non parquet table, do your work to modify the new table, update the new column, etc, then select back into a new parquet table with the new schema.

Can you append to a Parquet file?

Parquet slices columns into chunks and allows parts of a column to be stored in several chunks within a single file, thus append is possible.

How do I add a column to an existing Spark DataFrame?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

Can you update a Parquet file?

TLDR. Parquet-rewriter is a way to update parquet files by rewriting them in a more efficient manner.


1 Answers

Yes, it possible with both Databricks Delta as well as with parquet tables. An example is given below:-

This Example wrote in python (pySpark)

df = sqlContext.createDataFrame([('1','Name_1','Address_1'),('2','Name_2','Address_2'),('3','Name_3','Address_3')], schema=['ID', 'Name', 'Address'])

delta_tblNm = 'testDeltaSchema.test_delta_tbl'
parquet_tblNm = 'testParquetSchema.test_parquet_tbl'

delta_write_loc = 'dbfs:///mnt/datalake/stg/delta_tblNm'
parquet_write_loc = 'dbfs:///mnt/datalake/stg/parquet_tblNm'


# DELTA TABLE
df.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(delta_write_loc)
spark.sql(" create table if not exists {} using DELTA LOCATION '{}'".format(delta_tblNm, delta_write_loc))
spark.sql("refresh table {}".format(print(cur_tblNm)))

# PARQUET TABLE
df.write.format("parquet").mode("overwrite").save(parquet_write_loc)
spark.sql("""CREATE TABLE if not exists {} USING PARQUET LOCATION '{}'""".format(parquet_tblNm, parquet_write_loc))
spark.sql(""" REFRESH TABLE {} """.format(parquet_tblNm))

test_df = spark.sql("select * testDeltaSchema.test_delta_tbl")
test_df.show()

test_df = spark.sql("select * from testParquetSchema.test_parquet_tbl")
test_df.show()

test_df = spark.sql("ALTER TABLE  testDeltaSchema.test_delta_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()

test_df = spark.sql("ALTER TABLE  testParquetSchema.test_parquet_tbl ADD COLUMNS (Mob_number String COMMENT 'newCol' AFTER Address)")
test_df.show()
like image 121
Sandy Avatar answered Sep 21 '22 14:09

Sandy